Compare commits

...

22 Commits

Author SHA1 Message Date
Chris Eager
8161f55a82 Add dynamic configuration for setting Dynamo as primary 2021-09-17 13:28:45 -07:00
Chris Eager
ecee189ad8 Add AccountDatabaseCrawler.dedicatedDynamoMigrationCrawler 2021-09-17 11:27:20 -07:00
Jon Chambers
ef0900f3ac Add .tx/ to .gitignore 2021-09-17 13:43:52 -04:00
Fedor Indutny
383d744bd8 Log the error message when retrying queue send 2021-09-16 18:03:42 -04:00
Jon Chambers
c2ba8ab562 Identify receipt destinations by UUID instead of e164 2021-09-16 10:47:03 -04:00
Chris Eager
cd49ea43c0 Use queryPaginator when loading messages 2021-09-16 10:46:37 -04:00
Chris Eager
53aa45a2bb Use queryPaginator when deleting messages 2021-09-16 10:46:37 -04:00
Chris Eager
83e0a19561 Migrate MessagesDynamoDbRule to MessagesDynamoDbExtension 2021-09-16 10:46:37 -04:00
Jon Chambers
6a5d475198 Add a "refresh websocket on number change" provider 2021-09-16 10:37:34 -04:00
Jon Chambers
49ccbba2e3 Generalize the "watch for websockets that need to be refreshed" listener 2021-09-16 10:37:34 -04:00
Fedor Indutnyy
41735ed40e Introduce queueDrainRetry counter metric 2021-09-16 10:30:19 -04:00
Ehren Kret
2d11a433c9 Wrap all calls to getAcceptableLanguages
ContainerRequestContext#getAcceptableLanguages throws a
ProcessingException if the header has invalid values in it. Rather than
error out of the request entirely with the exception handler for that
exception, we just treat it as though no Accept-Languages header was
specified.
2021-09-16 09:28:21 -05:00
Ehren Kret
e79ab2521f Rename field in ConfiguredProfileBadgeConverter 2021-09-16 09:28:21 -05:00
Ehren Kret
fb1f99da87 Add a method to enable a badge for all accounts 2021-09-16 09:28:21 -05:00
Ehren Kret
08c6a8c2e5 Add category to badges 2021-09-16 09:28:21 -05:00
Ehren Kret
ce3835e176 Rename id to name in the configuration 2021-09-16 09:28:21 -05:00
Ehren Kret
39f6eadbb9 Add test for add and remove badges 2021-09-16 09:28:21 -05:00
Ehren Kret
16dba09b61 Handle merging badges when adding to account 2021-09-16 09:28:21 -05:00
Ehren Kret
d5ebf2f2ed Rename name to id in Account#removeBadge 2021-09-16 09:28:21 -05:00
Ehren Kret
8a8e6e7b49 Rename name to id in the stored badge information and expose id in the profile endpoint 2021-09-16 09:28:21 -05:00
Ehren Kret
34e21b9f7b Change name to id on AccountBadge
This makes it distinct from the localized name field on the Badge
entity that is returned.
2021-09-16 09:28:21 -05:00
Ehren Kret
98a31d1474 Switch ProfileController to the actual badge converter 2021-09-16 09:28:21 -05:00
42 changed files with 1320 additions and 785 deletions

1
.gitignore vendored
View File

@@ -22,3 +22,4 @@ deployer-staging.properties
deployer-production.properties
deployer.log
/service/src/main/resources/org/signal/badges/Badges_*.properties
.tx/

View File

@@ -261,5 +261,8 @@ donation:
badges:
badges:
- name: TEST
- id: TEST
imageUrl: https://example.com/test-badge
category: other
badgeIdsEnabledForAll:
- TEST

View File

@@ -29,7 +29,6 @@ import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
import io.lettuce.core.resource.ClientResources;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Meter.Id;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
@@ -37,6 +36,7 @@ import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.datadog.DatadogMeterRegistry;
import java.net.http.HttpClient;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -62,13 +62,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.DispatchManager;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.auth.AuthEnablementApplicationEventListener;
import org.whispersystems.textsecuregcm.auth.WebsocketRefreshApplicationEventListener;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.auth.CertificateGenerator;
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccountAuthenticator;
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAuthenticatedAccount;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator;
import org.whispersystems.textsecuregcm.auth.TurnTokenGenerator;
import org.whispersystems.textsecuregcm.badges.ConfiguredProfileBadgeConverter;
import org.whispersystems.textsecuregcm.badges.ProfileBadgeConverter;
import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration;
import org.whispersystems.textsecuregcm.controllers.AccountController;
@@ -269,7 +270,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.build();
{
final DatadogMeterRegistry datadogMeterRegistry = new DatadogMeterRegistry(config.getDatadogConfiguration(), Clock.SYSTEM);
final DatadogMeterRegistry datadogMeterRegistry = new DatadogMeterRegistry(
config.getDatadogConfiguration(), io.micrometer.core.instrument.Clock.SYSTEM);
datadogMeterRegistry.config().commonTags(
Tags.of(
@@ -295,7 +297,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.getObjectMapper().setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
environment.getObjectMapper().setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
ProfileBadgeConverter profileBadgeConverter = (acceptableLanguages, accountBadges) -> List.of(); // TODO: Provide an actual implementation.
ProfileBadgeConverter profileBadgeConverter = new ConfiguredProfileBadgeConverter(
Clock.systemUTC(), config.getBadges());
JdbiFactory jdbiFactory = new JdbiFactory(DefaultNameStrategy.CHECK_EMPTY);
Jdbi accountJdbi = jdbiFactory.build(environment, config.getAccountsDatabaseConfiguration(), "accountdb");
@@ -549,6 +552,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getDynamoDbMigrationCrawlerConfiguration().getChunkIntervalMs(),
accountsCrawlerChunkPreReadExecutor,
dynamicConfigurationManager);
accountDynamoDbMigrationCrawler.setDedicatedDynamoMigrationCrawler(true);
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
MigrationRetryAccountsTableCrawler migrationRetryAccountsTableCrawler = new MigrationRetryAccountsTableCrawler(
@@ -611,7 +615,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DisabledPermittedAuthenticatedAccount.class, disabledPermittedAccountAuthFilter)));
environment.jersey().register(new PolymorphicAuthValueFactoryProvider.Binder<>(
ImmutableSet.of(AuthenticatedAccount.class, DisabledPermittedAuthenticatedAccount.class)));
environment.jersey().register(new AuthEnablementApplicationEventListener(clientPresenceManager));
environment.jersey().register(new WebsocketRefreshApplicationEventListener(clientPresenceManager));
environment.jersey().register(new TimestampResponseFilter());
environment.jersey().register(new VoiceVerificationController(config.getVoiceVerificationConfiguration().getUrl(),
config.getVoiceVerificationConfiguration().getLocales()));
@@ -623,7 +627,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
webSocketEnvironment.setConnectListener(
new AuthenticatedConnectListener(receiptSender, messagesManager, messageSender, apnFallbackManager,
clientPresenceManager, retrySchedulingExecutor));
webSocketEnvironment.jersey().register(new AuthEnablementApplicationEventListener(clientPresenceManager));
webSocketEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(clientPresenceManager));
webSocketEnvironment.jersey().register(new ContentLengthFilter(TrafficSource.WEBSOCKET));
webSocketEnvironment.jersey().register(MultiRecipientMessageProvider.class);
webSocketEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET));
@@ -665,7 +669,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
WebSocketEnvironment<AuthenticatedAccount> provisioningEnvironment = new WebSocketEnvironment<>(environment,
webSocketEnvironment.getRequestLog(), 60000);
provisioningEnvironment.jersey().register(new AuthEnablementApplicationEventListener(clientPresenceManager));
provisioningEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(clientPresenceManager));
provisioningEnvironment.setConnectListener(new ProvisioningConnectListener(pubSubManager));
provisioningEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET));
provisioningEnvironment.jersey().register(new KeepAliveController(clientPresenceManager));

View File

@@ -1,33 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.auth;
import org.glassfish.jersey.server.monitoring.ApplicationEvent;
import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
/**
* Delegates request events to a listener that handles auth-enablement changes
*/
public class AuthEnablementApplicationEventListener implements ApplicationEventListener {
private final AuthEnablementRequestEventListener authEnablementRequestEventListener;
public AuthEnablementApplicationEventListener(final ClientPresenceManager clientPresenceManager) {
this.authEnablementRequestEventListener = new AuthEnablementRequestEventListener(clientPresenceManager);
}
@Override
public void onEvent(final ApplicationEvent event) {
}
@Override
public RequestEventListener onRequest(final RequestEvent requestEvent) {
return authEnablementRequestEventListener;
}
}

View File

@@ -0,0 +1,114 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.auth;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.glassfish.jersey.server.ContainerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Pair;
/**
* This {@link WebsocketRefreshRequirementProvider} observes intra-request changes in {@link Account#isEnabled()} and
* {@link Device#isEnabled()}.
* <p>
* If a change in {@link Account#isEnabled()} is observed, then any active WebSocket connections for the account must be
* closed, in order for clients to get a refreshed {@link io.dropwizard.auth.Auth} object.
* <p>
* If a change in {@link Device#isEnabled()} is observed, including deletion of the {@link Device}, then any active
* WebSocket connections for the device must be closed and re-authenticated.
*
* @see AuthenticatedAccount
* @see DisabledPermittedAuthenticatedAccount
*/
public class AuthEnablementRefreshRequirementProvider implements WebsocketRefreshRequirementProvider {
private static final Logger logger = LoggerFactory.getLogger(AuthEnablementRefreshRequirementProvider.class);
private static final String ACCOUNT_ENABLED = AuthEnablementRefreshRequirementProvider.class.getName() + ".accountEnabled";
private static final String DEVICES_ENABLED = AuthEnablementRefreshRequirementProvider.class.getName() + ".devicesEnabled";
@VisibleForTesting
Map<Long, Boolean> buildDevicesEnabledMap(final Account account) {
return account.getDevices().stream()
.collect(() -> new HashMap<>(account.getDevices().size()),
(map, device) -> map.put(device.getId(), device.isEnabled()), HashMap::putAll);
}
@Override
public void handleRequestFiltered(final ContainerRequest request) {
// The authenticated principal, if any, will be available after filters have run.
// Now that the account is known, capture a snapshot of `isEnabled` for the account and its devices,
// before carrying out the requests business logic.
ContainerRequestUtil.getAuthenticatedAccount(request)
.ifPresent(
account -> {
request.setProperty(ACCOUNT_ENABLED, account.isEnabled());
request.setProperty(DEVICES_ENABLED, buildDevicesEnabledMap(account));
});
}
@Override
public List<Pair<UUID, Long>> handleRequestFinished(final ContainerRequest request) {
// Now that the request is finished, check whether `isEnabled` changed for any of the devices, or the account
// as a whole. If the value did change, the affected device(s) must disconnect and reauthenticate.
// If a device was removed, it must also disconnect.
if (request.getProperty(ACCOUNT_ENABLED) != null &&
request.getProperty(DEVICES_ENABLED) != null) {
final boolean accountInitiallyEnabled = (boolean) request.getProperty(ACCOUNT_ENABLED);
@SuppressWarnings("unchecked") final Map<Long, Boolean> initialDevicesEnabled =
(Map<Long, Boolean>) request.getProperty(DEVICES_ENABLED);
return ContainerRequestUtil.getAuthenticatedAccount(request).map(account -> {
final Set<Long> deviceIdsToDisplace;
if (account.isEnabled() != accountInitiallyEnabled) {
// the @Auth for all active connections must change when account.isEnabled() changes
deviceIdsToDisplace = account.getDevices().stream()
.map(Device::getId).collect(Collectors.toSet());
deviceIdsToDisplace.addAll(initialDevicesEnabled.keySet());
} else if (!initialDevicesEnabled.isEmpty()) {
deviceIdsToDisplace = new HashSet<>();
final Map<Long, Boolean> currentDevicesEnabled = buildDevicesEnabledMap(account);
initialDevicesEnabled.forEach((deviceId, enabled) -> {
// `null` indicates the device was removed from the account. Any active presence should be removed.
final boolean enabledMatches = Objects.equals(enabled,
currentDevicesEnabled.getOrDefault(deviceId, null));
if (!enabledMatches) {
deviceIdsToDisplace.add(deviceId);
}
});
} else {
deviceIdsToDisplace = Collections.emptySet();
}
return deviceIdsToDisplace.stream().map(deviceId -> new Pair<>(account.getUuid(), deviceId))
.collect(Collectors.toList());
}).orElseGet(() -> {
logger.error("Request had account, but it is no longer present");
return Collections.emptyList();
});
} else
return Collections.emptyList();
}
}

View File

@@ -1,152 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.auth;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.core.SecurityContext;
import org.glassfish.jersey.server.ContainerRequest;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEvent.Type;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
/**
* This {@link RequestEventListener} observes intra-request changes in {@link Account#isEnabled()} and {@link
* Device#isEnabled()}.
* <p>
* If a change in {@link Account#isEnabled()} is observed, then any active WebSocket connections for the account must be
* closed, in order for clients to get a refreshed {@link io.dropwizard.auth.Auth} object.
* <p>
* If a change in {@link Device#isEnabled()} is observed, including deletion of the {@link Device}, then any active
* WebSocket connections for the device must be closed and re-authenticated.
*
* @see AuthenticatedAccount
* @see DisabledPermittedAuthenticatedAccount
*/
public class AuthEnablementRequestEventListener implements RequestEventListener {
private static final Logger logger = LoggerFactory.getLogger(AuthEnablementRequestEventListener.class);
private static final String ACCOUNT_ENABLED = AuthEnablementRequestEventListener.class.getName() + ".accountEnabled";
private static final String DEVICES_ENABLED = AuthEnablementRequestEventListener.class.getName() + ".devicesEnabled";
private static final Counter DISPLACED_ACCOUNTS = Metrics.counter(
name(AuthEnablementRequestEventListener.class, "displacedAccounts"));
private static final Counter DISPLACED_DEVICES = Metrics.counter(
name(AuthEnablementRequestEventListener.class, "displacedDevices"));
private final ClientPresenceManager clientPresenceManager;
public AuthEnablementRequestEventListener(final ClientPresenceManager clientPresenceManager) {
this.clientPresenceManager = clientPresenceManager;
}
@Override
public void onEvent(final RequestEvent event) {
if (event.getType() == Type.REQUEST_FILTERED) {
// The authenticated principal, if any, will be available after filters have run.
// Now that the account is known, capture a snapshot of `isEnabled` for the account and its devices,
// before carrying out the requests business logic.
findAccount(event.getContainerRequest())
.ifPresent(
account -> {
event.getContainerRequest().setProperty(ACCOUNT_ENABLED, account.isEnabled());
event.getContainerRequest().setProperty(DEVICES_ENABLED, buildDevicesEnabledMap(account));
});
} else if (event.getType() == Type.FINISHED) {
// Now that the request is finished, check whether `isEnabled` changed for any of the devices, or the account
// as a whole. If the value did change, the affected device(s) must disconnect and reauthenticate.
// If a device was removed, it must also disconnect.
if (event.getContainerRequest().getProperty(ACCOUNT_ENABLED) != null &&
event.getContainerRequest().getProperty(DEVICES_ENABLED) != null) {
final boolean accountInitiallyEnabled = (boolean) event.getContainerRequest().getProperty(ACCOUNT_ENABLED);
@SuppressWarnings("unchecked") final Map<Long, Boolean> initialDevicesEnabled = (Map<Long, Boolean>) event.getContainerRequest()
.getProperty(DEVICES_ENABLED);
findAccount(event.getContainerRequest()).ifPresentOrElse(account -> {
final Set<Long> deviceIdsToDisplace;
if (account.isEnabled() != accountInitiallyEnabled) {
// the @Auth for all active connections must change when account.isEnabled() changes
deviceIdsToDisplace = account.getDevices().stream()
.map(Device::getId).collect(Collectors.toSet());
deviceIdsToDisplace.addAll(initialDevicesEnabled.keySet());
DISPLACED_ACCOUNTS.increment();
} else if (!initialDevicesEnabled.isEmpty()) {
deviceIdsToDisplace = new HashSet<>();
final Map<Long, Boolean> currentDevicesEnabled = buildDevicesEnabledMap(account);
initialDevicesEnabled.forEach((deviceId, enabled) -> {
// `null` indicates the device was removed from the account. Any active presence should be removed.
final boolean enabledMatches = Objects.equals(enabled,
currentDevicesEnabled.getOrDefault(deviceId, null));
if (!enabledMatches) {
deviceIdsToDisplace.add(deviceId);
DISPLACED_DEVICES.increment();
}
});
} else {
deviceIdsToDisplace = Collections.emptySet();
}
deviceIdsToDisplace.forEach(deviceId -> {
try {
// displacing presence will cause a reauthorization for the devices active connections
clientPresenceManager.displacePresence(account.getUuid(), deviceId);
} catch (final Exception e) {
logger.error("Could not displace device presence", e);
}
});
},
() -> logger.error("Request had account, but it is no longer present")
);
}
}
}
private Optional<Account> findAccount(final ContainerRequest containerRequest) {
return Optional.ofNullable(containerRequest.getSecurityContext())
.map(SecurityContext::getUserPrincipal)
.map(principal -> {
if (principal instanceof AccountAndAuthenticatedDeviceHolder) {
return ((AccountAndAuthenticatedDeviceHolder) principal).getAccount();
}
return null;
});
}
@VisibleForTesting
Map<Long, Boolean> buildDevicesEnabledMap(final Account account) {
return account.getDevices().stream()
.collect(() -> new HashMap<>(account.getDevices().size()),
(map, device) -> map.put(device.getId(), device.isEnabled()), HashMap::putAll);
}
}

View File

@@ -0,0 +1,21 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.auth;
import org.glassfish.jersey.server.ContainerRequest;
import org.whispersystems.textsecuregcm.storage.Account;
import javax.ws.rs.core.SecurityContext;
import java.util.Optional;
class ContainerRequestUtil {
static Optional<Account> getAuthenticatedAccount(final ContainerRequest request) {
return Optional.ofNullable(request.getSecurityContext())
.map(SecurityContext::getUserPrincipal)
.map(principal -> principal instanceof AccountAndAuthenticatedDeviceHolder
? ((AccountAndAuthenticatedDeviceHolder) principal).getAccount() : null);
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.auth;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.glassfish.jersey.server.ContainerRequest;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.util.Pair;
public class PhoneNumberChangeRefreshRequirementProvider implements WebsocketRefreshRequirementProvider {
private static final String INITIAL_NUMBER_KEY =
PhoneNumberChangeRefreshRequirementProvider.class.getName() + ".initialNumber";
@Override
public void handleRequestFiltered(final ContainerRequest request) {
ContainerRequestUtil.getAuthenticatedAccount(request)
.ifPresent(account -> request.setProperty(INITIAL_NUMBER_KEY, account.getNumber()));
}
@Override
public List<Pair<UUID, Long>> handleRequestFinished(final ContainerRequest request) {
final String initialNumber = (String) request.getProperty(INITIAL_NUMBER_KEY);
if (initialNumber != null) {
final Optional<Account> maybeAuthenticatedAccount = ContainerRequestUtil.getAuthenticatedAccount(request);
return maybeAuthenticatedAccount
.filter(account -> !initialNumber.equals(account.getNumber()))
.map(account -> account.getDevices().stream()
.map(device -> new Pair<>(account.getUuid(), device.getId()))
.collect(Collectors.toList()))
.orElse(Collections.emptyList());
} else {
return Collections.emptyList();
}
}
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.auth;
import org.glassfish.jersey.server.monitoring.ApplicationEvent;
import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
/**
* Delegates request events to a listener that watches for intra-request changes that require websocket refreshes
*/
public class WebsocketRefreshApplicationEventListener implements ApplicationEventListener {
private final WebsocketRefreshRequestEventListener websocketRefreshRequestEventListener;
public WebsocketRefreshApplicationEventListener(final ClientPresenceManager clientPresenceManager) {
this.websocketRefreshRequestEventListener = new WebsocketRefreshRequestEventListener(clientPresenceManager,
new AuthEnablementRefreshRequirementProvider(),
new PhoneNumberChangeRefreshRequirementProvider());
}
@Override
public void onEvent(final ApplicationEvent event) {
}
@Override
public RequestEventListener onRequest(final RequestEvent requestEvent) {
return websocketRefreshRequestEventListener;
}
}

View File

@@ -0,0 +1,69 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.auth;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEvent.Type;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
public class WebsocketRefreshRequestEventListener implements RequestEventListener {
private final ClientPresenceManager clientPresenceManager;
private final WebsocketRefreshRequirementProvider[] providers;
private static final Counter DISPLACED_ACCOUNTS = Metrics.counter(
name(WebsocketRefreshRequestEventListener.class, "displacedAccounts"));
private static final Counter DISPLACED_DEVICES = Metrics.counter(
name(WebsocketRefreshRequestEventListener.class, "displacedDevices"));
private static final Logger logger = LoggerFactory.getLogger(WebsocketRefreshRequestEventListener.class);
public WebsocketRefreshRequestEventListener(
final ClientPresenceManager clientPresenceManager,
final WebsocketRefreshRequirementProvider... providers) {
this.clientPresenceManager = clientPresenceManager;
this.providers = providers;
}
@Override
public void onEvent(final RequestEvent event) {
if (event.getType() == Type.REQUEST_FILTERED) {
for (final WebsocketRefreshRequirementProvider provider : providers) {
provider.handleRequestFiltered(event.getContainerRequest());
}
} else if (event.getType() == Type.FINISHED) {
final AtomicInteger displacedDevices = new AtomicInteger(0);
Arrays.stream(providers)
.flatMap(provider -> provider.handleRequestFinished(event.getContainerRequest()).stream())
.distinct()
.forEach(pair -> {
try {
displacedDevices.incrementAndGet();
clientPresenceManager.displacePresence(pair.first(), pair.second());
} catch (final Exception e) {
logger.error("Could not displace device presence", e);
}
});
if (displacedDevices.get() > 0) {
DISPLACED_ACCOUNTS.increment();
DISPLACED_DEVICES.increment(displacedDevices.get());
}
}
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.auth;
import java.util.List;
import java.util.UUID;
import org.glassfish.jersey.server.ContainerRequest;
import org.whispersystems.textsecuregcm.util.Pair;
/**
* A websocket refresh requirement provider watches for intra-request changes (e.g. to authentication status) that
* require a websocket refresh.
*/
public interface WebsocketRefreshRequirementProvider {
/**
* Processes a request after filters have run and the request has been mapped to a destination controller.
*
* @param request the request to observe
*/
void handleRequestFiltered(ContainerRequest request);
/**
* Processes a request after all normal request handling has been completed.
*
* @param request the request to observe
* @return a list of pairs of account UUID/device ID pairs identifying websockets that need to be refreshed as a
* result of the observed request
*/
List<Pair<UUID, Long>> handleRequestFinished(ContainerRequest request);
}

View File

@@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.badges;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -29,6 +30,7 @@ public class ConfiguredProfileBadgeConverter implements ProfileBadgeConverter {
private final Clock clock;
private final Map<String, BadgeConfiguration> knownBadges;
private final List<String> badgeIdsEnabledForAll;
private final ResourceBundleFactory resourceBundleFactory;
public ConfiguredProfileBadgeConverter(
@@ -44,7 +46,8 @@ public class ConfiguredProfileBadgeConverter implements ProfileBadgeConverter {
final ResourceBundleFactory resourceBundleFactory) {
this.clock = clock;
this.knownBadges = badgesConfiguration.getBadges().stream()
.collect(Collectors.toMap(BadgeConfiguration::getName, Function.identity()));
.collect(Collectors.toMap(BadgeConfiguration::getId, Function.identity()));
this.badgeIdsEnabledForAll = badgesConfiguration.getBadgeIdsEnabledForAll();
this.resourceBundleFactory = resourceBundleFactory;
}
@@ -52,7 +55,7 @@ public class ConfiguredProfileBadgeConverter implements ProfileBadgeConverter {
public List<Badge> convert(
final List<Locale> acceptableLanguages,
final List<AccountBadge> accountBadges) {
if (accountBadges.isEmpty()) {
if (accountBadges.isEmpty() && badgeIdsEnabledForAll.isEmpty()) {
return List.of();
}
@@ -86,13 +89,29 @@ public class ConfiguredProfileBadgeConverter implements ProfileBadgeConverter {
};
final ResourceBundle resourceBundle = resourceBundleFactory.createBundle(BASE_NAME, desiredLocale, control);
return accountBadges.stream()
List<Badge> badges = accountBadges.stream()
.filter(accountBadge -> accountBadge.isVisible()
&& now.isBefore(accountBadge.getExpiration())
&& knownBadges.containsKey(accountBadge.getName()))
.map(accountBadge -> new Badge(knownBadges.get(accountBadge.getName()).getImageUrl(),
resourceBundle.getString(accountBadge.getName() + "_name"),
resourceBundle.getString(accountBadge.getName() + "_description")))
.collect(Collectors.toList());
&& knownBadges.containsKey(accountBadge.getId()))
.map(accountBadge -> {
BadgeConfiguration configuration = knownBadges.get(accountBadge.getId());
return new Badge(
accountBadge.getId(),
configuration.getCategory(),
configuration.getImageUrl(),
resourceBundle.getString(accountBadge.getId() + "_name"),
resourceBundle.getString(accountBadge.getId() + "_description"));
})
.collect(Collectors.toCollection(ArrayList::new));
badges.addAll(badgeIdsEnabledForAll.stream().filter(knownBadges::containsKey).map(id -> {
BadgeConfiguration configuration = knownBadges.get(id);
return new Badge(
id,
configuration.getCategory(),
configuration.getImageUrl(),
resourceBundle.getString(id + "_name"),
resourceBundle.getString(id + "_description"));
}).collect(Collectors.toList()));
return badges;
}
}

View File

@@ -14,20 +14,23 @@ import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
public class BadgeConfiguration {
private final String name;
private final String id;
private final URL imageUrl;
private final String category;
@JsonCreator
public BadgeConfiguration(
@JsonProperty("name") final String name,
@JsonProperty("imageUrl") @JsonDeserialize(converter = URLDeserializationConverter.class) final URL imageUrl) {
this.name = name;
@JsonProperty("id") final String id,
@JsonProperty("imageUrl") @JsonDeserialize(converter = URLDeserializationConverter.class) final URL imageUrl,
@JsonProperty("category") final String category) {
this.id = id;
this.imageUrl = imageUrl;
this.category = category;
}
@NotEmpty
public String getName() {
return name;
public String getId() {
return id;
}
@NotNull
@@ -35,4 +38,9 @@ public class BadgeConfiguration {
public URL getImageUrl() {
return imageUrl;
}
@NotEmpty
public String getCategory() {
return category;
}
}

View File

@@ -16,11 +16,14 @@ import javax.validation.constraints.NotNull;
public class BadgesConfiguration {
private final List<BadgeConfiguration> badges;
private final List<String> badgeIdsEnabledForAll;
@JsonCreator
public BadgesConfiguration(
@JsonProperty("badges") @JsonSetter(nulls = Nulls.AS_EMPTY) final List<BadgeConfiguration> badges) {
@JsonProperty("badges") @JsonSetter(nulls = Nulls.AS_EMPTY) final List<BadgeConfiguration> badges,
@JsonProperty("badgeIdsEnabledForAll") @JsonSetter(nulls = Nulls.AS_EMPTY) final List<String> badgeIdsEnabledForAll) {
this.badges = Objects.requireNonNull(badges);
this.badgeIdsEnabledForAll = Objects.requireNonNull(badgeIdsEnabledForAll);
}
@Valid
@@ -28,4 +31,10 @@ public class BadgesConfiguration {
public List<BadgeConfiguration> getBadges() {
return badges;
}
@Valid
@NotNull
public List<String> getBadgeIdsEnabledForAll() {
return badgeIdsEnabledForAll;
}
}

View File

@@ -5,6 +5,9 @@ import com.google.common.annotations.VisibleForTesting;
public class DynamicAccountsDynamoDbMigrationConfiguration {
@JsonProperty
boolean dynamoPrimary;
@JsonProperty
boolean backgroundMigrationEnabled;
@@ -35,6 +38,10 @@ public class DynamicAccountsDynamoDbMigrationConfiguration {
@JsonProperty
int dynamoCrawlerScanPageSize = 10;
public boolean isDynamoPrimary() {
return dynamoPrimary;
}
public boolean isBackgroundMigrationEnabled() {
return backgroundMigrationEnabled;
}

View File

@@ -498,7 +498,7 @@ public class MessageController {
try {
receiptSender.sendReceipt(
new AuthenticatedAccount(() -> new Pair<>(destination, destination.getMasterDevice().get())),
source.getNumber(), timestamp);
source.getUuid(), timestamp);
} catch (final NoSuchUserException ignored) {
}
}, receiptDelay.toMillis(), TimeUnit.MILLISECONDS);
@@ -583,7 +583,7 @@ public class MessageController {
WebSocketConnection.recordMessageDeliveryDuration(message.get().getTimestamp(), auth.getAuthenticatedDevice());
if (!Util.isEmpty(message.get().getSource())
&& message.get().getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT_VALUE) {
receiptSender.sendReceipt(auth, message.get().getSource(), message.get().getTimestamp());
receiptSender.sendReceipt(auth, message.get().getSourceUuid(), message.get().getTimestamp());
}
}

View File

@@ -4,28 +4,15 @@
*/
package org.whispersystems.textsecuregcm.controllers;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
public class NoSuchUserException extends Exception {
private List<String> missing;
public NoSuchUserException(String user) {
super(user);
missing = new LinkedList<>();
missing.add(user);
}
public NoSuchUserException(List<String> missing) {
this.missing = missing;
public NoSuchUserException(final UUID uuid) {
super(uuid.toString());
}
public NoSuchUserException(Exception e) {
super(e);
}
public List<String> getMissing() {
return missing;
}
}

View File

@@ -10,7 +10,6 @@ import io.dropwizard.auth.Auth;
import java.security.SecureRandom;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
@@ -194,13 +193,8 @@ public class ProfileController {
if (!isZkEnabled) {
throw new WebApplicationException(Response.Status.NOT_FOUND);
}
final List<Locale> acceptableLanguages = new ArrayList<>();
try {
acceptableLanguages.addAll(containerRequestContext.getAcceptableLanguages());
} catch (final ProcessingException e) {
logger.warn("Could not get acceptable languages", e);
}
return getVersionedProfile(auth.map(AuthenticatedAccount::getAccount), accessKey, acceptableLanguages, uuid,
return getVersionedProfile(auth.map(AuthenticatedAccount::getAccount), accessKey,
getAcceptableLanguagesForRequest(containerRequestContext), uuid,
version, Optional.empty());
}
@@ -219,13 +213,8 @@ public class ProfileController {
if (!isZkEnabled) {
throw new WebApplicationException(Response.Status.NOT_FOUND);
}
final List<Locale> acceptableLanguages = new ArrayList<>();
try {
acceptableLanguages.addAll(containerRequestContext.getAcceptableLanguages());
} catch (final ProcessingException e) {
logger.warn("Could not get acceptable languages", e);
}
return getVersionedProfile(auth.map(AuthenticatedAccount::getAccount), accessKey, acceptableLanguages, uuid,
return getVersionedProfile(auth.map(AuthenticatedAccount::getAccount), accessKey,
getAcceptableLanguagesForRequest(containerRequestContext), uuid,
version, Optional.of(credentialRequest));
}
@@ -296,8 +285,11 @@ public class ProfileController {
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/username/{username}")
public Profile getProfileByUsername(@Auth AuthenticatedAccount auth, @PathParam("username") String username)
throws RateLimitExceededException {
public Profile getProfileByUsername(
@Auth AuthenticatedAccount auth,
@Context ContainerRequestContext containerRequestContext,
@PathParam("username") String username)
throws RateLimitExceededException {
rateLimiters.getUsernameLookupLimiter().validate(auth.getAccount().getUuid());
username = username.toLowerCase();
@@ -326,7 +318,7 @@ public class ProfileController {
UserCapabilities.createForAccount(accountProfile.get()),
username,
accountProfile.get().getUuid(),
List.of(),
profileBadgeConverter.convert(getAcceptableLanguagesForRequest(containerRequestContext), accountProfile.get().getBadges()),
null);
}
@@ -367,8 +359,10 @@ public class ProfileController {
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{identifier}")
public Profile getProfile(@Auth Optional<AuthenticatedAccount> auth,
public Profile getProfile(
@Auth Optional<AuthenticatedAccount> auth,
@HeaderParam(OptionalAccess.UNIDENTIFIED) Optional<Anonymous> accessKey,
@Context ContainerRequestContext containerRequestContext,
@HeaderParam("User-Agent") String userAgent,
@PathParam("identifier") UUID identifier,
@QueryParam("ca") boolean useCaCertificate)
@@ -399,7 +393,7 @@ public class ProfileController {
UserCapabilities.createForAccount(accountProfile.get()),
username.orElse(null),
null,
List.of(),
profileBadgeConverter.convert(getAcceptableLanguagesForRequest(containerRequestContext), accountProfile.get().getBadges()),
null);
}
@@ -444,4 +438,13 @@ public class ProfileController {
return "profiles/" + Base64.encodeBase64URLSafeString(object);
}
private List<Locale> getAcceptableLanguagesForRequest(ContainerRequestContext containerRequestContext) {
try {
return containerRequestContext.getAcceptableLanguages();
} catch (final ProcessingException e) {
logger.warn("Could not get acceptable languages", e);
return List.of();
}
}
}

View File

@@ -11,20 +11,34 @@ import java.net.URL;
import java.util.Objects;
public class Badge {
private final String id;
private final String category;
private final URL imageUrl;
private final String name;
private final String description;
@JsonCreator
public Badge(
@JsonProperty("id") final String id,
@JsonProperty("category") final String category,
@JsonProperty("imageUrl") final URL imageUrl,
@JsonProperty("name") final String name,
@JsonProperty("description") final String description) {
this.id = id;
this.category = category;
this.imageUrl = imageUrl;
this.name = name;
this.description = description;
}
public String getId() {
return id;
}
public String getCategory() {
return category;
}
public URL getImageUrl() {
return imageUrl;
}
@@ -46,12 +60,14 @@ public class Badge {
return false;
}
Badge badge = (Badge) o;
return Objects.equals(imageUrl, badge.imageUrl) && Objects.equals(name,
badge.name) && Objects.equals(description, badge.description);
return Objects.equals(id, badge.id) && Objects.equals(category,
badge.category) && Objects.equals(imageUrl, badge.imageUrl)
&& Objects.equals(name, badge.name) && Objects.equals(description,
badge.description);
}
@Override
public int hashCode() {
return Objects.hash(imageUrl, name, description);
return Objects.hash(id, category, imageUrl, name, description);
}
}

View File

@@ -5,7 +5,7 @@
package org.whispersystems.textsecuregcm.push;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
@@ -22,21 +22,21 @@ public class ReceiptSender {
private static final Logger logger = LoggerFactory.getLogger(ReceiptSender.class);
public ReceiptSender(AccountsManager accountManager,
MessageSender messageSender) {
public ReceiptSender(final AccountsManager accountManager, final MessageSender messageSender) {
this.accountManager = accountManager;
this.messageSender = messageSender;
}
public void sendReceipt(AuthenticatedAccount source, String destination, long messageId)
throws NoSuchUserException {
public void sendReceipt(AuthenticatedAccount source, UUID destinationUuid, long messageId) throws NoSuchUserException {
final Account sourceAccount = source.getAccount();
if (sourceAccount.getNumber().equals(destination)) {
if (sourceAccount.getUuid().equals(destinationUuid)) {
return;
}
Account destinationAccount = getDestinationAccount(destination);
Envelope.Builder message = Envelope.newBuilder()
final Account destinationAccount = accountManager.get(destinationUuid)
.orElseThrow(() -> new NoSuchUserException(destinationUuid));
final Envelope.Builder message = Envelope.newBuilder()
.setServerTimestamp(System.currentTimeMillis())
.setSource(sourceAccount.getNumber())
.setSourceUuid(sourceAccount.getUuid().toString())
@@ -51,22 +51,9 @@ public class ReceiptSender {
for (final Device destinationDevice : destinationAccount.getDevices()) {
try {
messageSender.sendMessage(destinationAccount, destinationDevice, message.build(), false);
} catch (NotPushRegisteredException e) {
} catch (final NotPushRegisteredException e) {
logger.info("User no longer push registered for delivery receipt: " + e.getMessage());
}
}
}
private Account getDestinationAccount(String destination)
throws NoSuchUserException
{
Optional<Account> account = accountManager.get(destination);
if (account.isEmpty()) {
throw new NoSuchUserException(destination);
}
return account.get();
}
}

View File

@@ -321,23 +321,39 @@ public class Account {
return badges;
}
public void addBadge(AccountBadge badge) {
public void addBadge(Clock clock, AccountBadge badge) {
requireNotStale();
badges.add(badge);
purgeStaleBadges();
boolean added = false;
for (int i = 0; i < badges.size(); i++) {
AccountBadge badgeInList = badges.get(i);
if (Objects.equals(badgeInList.getId(), badge.getId())) {
if (added) {
badges.remove(i);
i--;
} else {
badges.set(i, badgeInList.mergeWith(badge));
added = true;
}
}
}
if (!added) {
badges.add(badge);
}
purgeStaleBadges(clock);
}
public void removeBadge(String name) {
public void removeBadge(Clock clock, String id) {
requireNotStale();
badges.removeIf(accountBadge -> Objects.equals(accountBadge.getName(), name));
purgeStaleBadges();
badges.removeIf(accountBadge -> Objects.equals(accountBadge.getId(), id));
purgeStaleBadges(clock);
}
private void purgeStaleBadges() {
final Instant now = Clock.systemUTC().instant();
private void purgeStaleBadges(Clock clock) {
final Instant now = clock.instant();
badges.removeIf(accountBadge -> now.isAfter(accountBadge.getExpiration()));
}

View File

@@ -12,22 +12,48 @@ import java.util.Objects;
public class AccountBadge {
private final String name;
private final String id;
private final Instant expiration;
private final boolean visible;
@JsonCreator
public AccountBadge(
@JsonProperty("name") String name,
@JsonProperty("id") String id,
@JsonProperty("expiration") Instant expiration,
@JsonProperty("visible") boolean visible) {
this.name = name;
this.id = id;
this.expiration = expiration;
this.visible = visible;
}
public String getName() {
return name;
/**
* Returns a new AccountBadge that is a merging of the two originals. IDs must match for this operation to make sense.
* The expiration will be the later of the two.
* Visibility will be set if either of the passed in objects is visible.
*/
public AccountBadge mergeWith(AccountBadge other) {
if (!Objects.equals(other.id, id)) {
throw new IllegalArgumentException("merging badges should only take place for same id");
}
final Instant latestExpiration;
if (expiration == null || other.expiration == null) {
latestExpiration = null;
} else if (expiration.isAfter(other.expiration)) {
latestExpiration = expiration;
} else {
latestExpiration = other.expiration;
}
return new AccountBadge(
id,
latestExpiration,
visible || other.visible
);
}
public String getId() {
return id;
}
public Instant getExpiration() {
@@ -47,19 +73,19 @@ public class AccountBadge {
return false;
}
AccountBadge that = (AccountBadge) o;
return visible == that.visible && Objects.equals(name, that.name)
return visible == that.visible && Objects.equals(id, that.id)
&& Objects.equals(expiration, that.expiration);
}
@Override
public int hashCode() {
return Objects.hash(name, expiration, visible);
return Objects.hash(id, expiration, visible);
}
@Override
public String toString() {
return "AccountBadge{" +
"name='" + name + '\'' +
"id='" + id + '\'' +
", expiration=" + expiration +
", visible=" + visible +
'}';

View File

@@ -48,6 +48,9 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
private AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
// temporary to control behavior during the Postgres → Dynamo transition
private boolean dedicatedDynamoMigrationCrawler;
public AccountDatabaseCrawler(AccountsManager accounts,
AccountDatabaseCrawlerCache cache,
List<AccountDatabaseCrawlerListener> listeners,
@@ -128,7 +131,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
try (Timer.Context timer = processChunkTimer.time()) {
final boolean useDynamo = dynamicConfigurationManager.getConfiguration()
final boolean useDynamo = !dedicatedDynamoMigrationCrawler && dynamicConfigurationManager.getConfiguration()
.getAccountsDynamoDbMigrationConfiguration()
.isDynamoCrawlerEnabled();
@@ -212,6 +215,10 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
}
}
public void setDedicatedDynamoMigrationCrawler(final boolean dedicatedDynamoMigrationCrawler) {
this.dedicatedDynamoMigrationCrawler = dedicatedDynamoMigrationCrawler;
}
private synchronized void sleepWhileRunning(long delayMs) {
if (running.get()) Util.wait(this, delayMs);
}

View File

@@ -170,38 +170,29 @@ public class AccountsManager {
final UUID originalUuid = account.getUuid();
boolean freshUser = databaseCreate(account);
boolean freshUser = primaryCreate(account);
// databaseCreate() sometimes updates the UUID, if there was a number conflict.
// for metrics, we want dynamo to run with the same original UUID
// create() sometimes updates the UUID, if there was a number conflict.
// for metrics, we want secondary to run with the same original UUID
final UUID actualUuid = account.getUuid();
try {
if (dynamoWriteEnabled()) {
if (secondaryWriteEnabled()) {
account.setUuid(originalUuid);
runSafelyAndRecordMetrics(() -> dynamoCreate(account), Optional.of(account.getUuid()), freshUser,
(databaseResult, dynamoResult) -> {
runSafelyAndRecordMetrics(() -> secondaryCreate(account), Optional.of(account.getUuid()), freshUser,
(primaryResult, secondaryResult) -> {
if (!account.getUuid().equals(actualUuid)) {
// This is expected towards the beginning of the background migration, as Dynamo wont
// have many accounts available for re-registration
logger.warn("dynamoCreate() did not return correct UUID");
accountsDynamoDb.deleteInvalidMigration(account.getUuid());
return Optional.of("dynamoIncorrectUUID");
}
if (databaseResult.equals(dynamoResult)) {
if (primaryResult.equals(secondaryResult)) {
return Optional.empty();
}
if (dynamoResult) {
return Optional.of("dynamoFreshUser");
if (secondaryResult) {
return Optional.of("secondaryFreshUser");
}
return Optional.of("dbFreshUser");
return Optional.of("primaryFreshUser");
},
"create");
}
@@ -302,14 +293,17 @@ public class AccountsManager {
final UUID uuid = account.getUuid();
updatedAccount = updateWithRetries(account, updater, this::databaseUpdate, () -> databaseGet(uuid).get());
updatedAccount = updateWithRetries(account, updater, this::primaryUpdate, () -> primaryGet(uuid).get());
if (dynamoWriteEnabled()) {
runSafelyAndRecordMetrics(() -> dynamoGet(uuid).map(dynamoAccount -> {
if (secondaryWriteEnabled()) {
runSafelyAndRecordMetrics(() -> secondaryGet(uuid).map(secondaryAccount -> {
try {
return updateWithRetries(dynamoAccount, updater, this::dynamoUpdate, () -> dynamoGet(uuid).get());
return updateWithRetries(secondaryAccount, updater, this::secondaryUpdate, () -> secondaryGet(uuid).get());
} catch (final OptimisticLockRetryLimitExceededException e) {
accountsDynamoDb.putUuidForMigrationRetry(uuid);
if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isDynamoPrimary()) {
accountsDynamoDb.putUuidForMigrationRetry(uuid);
}
throw e;
}
@@ -385,11 +379,11 @@ public class AccountsManager {
Optional<Account> account = redisGet(number);
if (!account.isPresent()) {
account = databaseGet(number);
account = primaryGet(number);
account.ifPresent(value -> redisSet(value));
if (dynamoReadEnabled()) {
runSafelyAndRecordMetrics(() -> dynamoGet(number), Optional.empty(), account, this::compareAccounts,
if (secondaryReadEnabled()) {
runSafelyAndRecordMetrics(() -> secondaryGet(number), Optional.empty(), account, this::compareAccounts,
"getByNumber");
}
}
@@ -403,11 +397,11 @@ public class AccountsManager {
Optional<Account> account = redisGet(uuid);
if (!account.isPresent()) {
account = databaseGet(uuid);
account = primaryGet(uuid);
account.ifPresent(value -> redisSet(value));
if (dynamoReadEnabled()) {
runSafelyAndRecordMetrics(() -> dynamoGet(uuid), Optional.of(uuid), account, this::compareAccounts,
if (secondaryReadEnabled()) {
runSafelyAndRecordMetrics(() -> secondaryGet(uuid), Optional.of(uuid), account, this::compareAccounts,
"getByUuid");
}
}
@@ -453,13 +447,13 @@ public class AccountsManager {
deleteBackupServiceDataFuture.join();
redisDelete(account);
databaseDelete(account);
primaryDelete(account);
if (dynamoDeleteEnabled()) {
if (secondaryDeleteEnabled()) {
try {
dynamoDelete(account);
secondaryDelete(account);
} catch (final Exception e) {
logger.error("Could not delete account {} from dynamo", account.getUuid().toString());
logger.error("Could not delete account {} from secondary", account.getUuid().toString());
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER_NAME, "action", "delete").increment();
}
}
@@ -538,7 +532,82 @@ public class AccountsManager {
private void redisDelete(final Account account) {
try (final Timer.Context ignored = redisDeleteTimer.time()) {
cacheCluster.useCluster(connection -> connection.sync().del(getAccountMapKey(account.getNumber()), getAccountEntityKey(account.getUuid())));
cacheCluster.useCluster(connection -> connection.sync()
.del(getAccountMapKey(account.getNumber()), getAccountEntityKey(account.getUuid())));
}
}
private Optional<Account> primaryGet(String number) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
dynamoGet(number) :
databaseGet(number);
}
private Optional<Account> secondaryGet(String number) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
databaseGet(number) :
dynamoGet(number);
}
private Optional<Account> primaryGet(UUID uuid) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
dynamoGet(uuid) :
databaseGet(uuid);
}
private Optional<Account> secondaryGet(UUID uuid) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
databaseGet(uuid) :
dynamoGet(uuid);
}
private boolean primaryCreate(Account account) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
dynamoCreate(account) :
databaseCreate(account);
}
private boolean secondaryCreate(Account account) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
databaseCreate(account) :
dynamoCreate(account);
}
private void primaryUpdate(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
dynamoUpdate(account);
} else {
databaseUpdate(account);
}
}
private void secondaryUpdate(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
databaseUpdate(account);
} else {
dynamoUpdate(account);
}
}
private void primaryDelete(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
dynamoDelete(account);
} else {
databaseDelete(account);
}
}
private void secondaryDelete(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
databaseDelete(account);
} else {
dynamoDelete(account);
}
}
@@ -582,69 +651,72 @@ public class AccountsManager {
accountsDynamoDb.delete(account.getUuid());
}
private boolean dynamoDeleteEnabled() {
private boolean secondaryDeleteEnabled() {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDeleteEnabled();
}
private boolean dynamoReadEnabled() {
private boolean secondaryReadEnabled() {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isReadEnabled();
}
private boolean dynamoWriteEnabled() {
return dynamoDeleteEnabled()
private boolean secondaryWriteEnabled() {
return secondaryDeleteEnabled()
&& dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isWriteEnabled();
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public Optional<String> compareAccounts(final Optional<Account> maybeDatabaseAccount, final Optional<Account> maybeDynamoAccount) {
public Optional<String> compareAccounts(final Optional<Account> maybePrimaryAccount,
final Optional<Account> maybeSecondaryAccount) {
if (maybeDatabaseAccount.isEmpty() && maybeDynamoAccount.isEmpty()) {
if (maybePrimaryAccount.isEmpty() && maybeSecondaryAccount.isEmpty()) {
return Optional.empty();
}
if (maybeDatabaseAccount.isEmpty()) {
return Optional.of("dbMissing");
if (maybePrimaryAccount.isEmpty()) {
return Optional.of("primaryMissing");
}
if (maybeDynamoAccount.isEmpty()) {
return Optional.of("dynamoMissing");
if (maybeSecondaryAccount.isEmpty()) {
return Optional.of("secondaryMissing");
}
final Account databaseAccount = maybeDatabaseAccount.get();
final Account dynamoAccount = maybeDynamoAccount.get();
final Account primaryAccount = maybePrimaryAccount.get();
final Account secondaryAccount = maybeSecondaryAccount.get();
final int uuidCompare = databaseAccount.getUuid().compareTo(dynamoAccount.getUuid());
final int uuidCompare = primaryAccount.getUuid().compareTo(secondaryAccount.getUuid());
if (uuidCompare != 0) {
return Optional.of("uuid");
}
final int numberCompare = databaseAccount.getNumber().compareTo(dynamoAccount.getNumber());
final int numberCompare = primaryAccount.getNumber().compareTo(secondaryAccount.getNumber());
if (numberCompare != 0) {
return Optional.of("number");
}
if (!Objects.equals(databaseAccount.getIdentityKey(), dynamoAccount.getIdentityKey())) {
if (!Objects.equals(primaryAccount.getIdentityKey(), secondaryAccount.getIdentityKey())) {
return Optional.of("identityKey");
}
if (!Objects.equals(databaseAccount.getCurrentProfileVersion(), dynamoAccount.getCurrentProfileVersion())) {
if (!Objects.equals(primaryAccount.getCurrentProfileVersion(), secondaryAccount.getCurrentProfileVersion())) {
return Optional.of("currentProfileVersion");
}
if (!Objects.equals(databaseAccount.getProfileName(), dynamoAccount.getProfileName())) {
if (!Objects.equals(primaryAccount.getProfileName(), secondaryAccount.getProfileName())) {
return Optional.of("profileName");
}
if (!Objects.equals(databaseAccount.getAvatar(), dynamoAccount.getAvatar())) {
if (!Objects.equals(primaryAccount.getAvatar(), secondaryAccount.getAvatar())) {
return Optional.of("avatar");
}
if (!Objects.equals(databaseAccount.getUnidentifiedAccessKey(), dynamoAccount.getUnidentifiedAccessKey())) {
if (databaseAccount.getUnidentifiedAccessKey().isPresent() && dynamoAccount.getUnidentifiedAccessKey().isPresent()) {
if (!Objects.equals(primaryAccount.getUnidentifiedAccessKey(), secondaryAccount.getUnidentifiedAccessKey())) {
if (primaryAccount.getUnidentifiedAccessKey().isPresent() && secondaryAccount.getUnidentifiedAccessKey()
.isPresent()) {
if (Arrays.compare(databaseAccount.getUnidentifiedAccessKey().get(), dynamoAccount.getUnidentifiedAccessKey().get()) != 0) {
if (Arrays.compare(primaryAccount.getUnidentifiedAccessKey().get(),
secondaryAccount.getUnidentifiedAccessKey().get()) != 0) {
return Optional.of("unidentifiedAccessKey");
}
@@ -653,40 +725,41 @@ public class AccountsManager {
}
}
if (!Objects.equals(databaseAccount.isUnrestrictedUnidentifiedAccess(), dynamoAccount.isUnrestrictedUnidentifiedAccess())) {
if (!Objects.equals(primaryAccount.isUnrestrictedUnidentifiedAccess(),
secondaryAccount.isUnrestrictedUnidentifiedAccess())) {
return Optional.of("unrestrictedUnidentifiedAccess");
}
if (!Objects.equals(databaseAccount.isDiscoverableByPhoneNumber(), dynamoAccount.isDiscoverableByPhoneNumber())) {
if (!Objects.equals(primaryAccount.isDiscoverableByPhoneNumber(), secondaryAccount.isDiscoverableByPhoneNumber())) {
return Optional.of("discoverableByPhoneNumber");
}
if (databaseAccount.getMasterDevice().isPresent() && dynamoAccount.getMasterDevice().isPresent()) {
if (!Objects.equals(databaseAccount.getMasterDevice().get().getSignedPreKey(),
dynamoAccount.getMasterDevice().get().getSignedPreKey())) {
if (primaryAccount.getMasterDevice().isPresent() && secondaryAccount.getMasterDevice().isPresent()) {
if (!Objects.equals(primaryAccount.getMasterDevice().get().getSignedPreKey(),
secondaryAccount.getMasterDevice().get().getSignedPreKey())) {
return Optional.of("masterDeviceSignedPreKey");
}
}
try {
if (!serializedEquals(databaseAccount.getDevices(), dynamoAccount.getDevices())) {
if (!serializedEquals(primaryAccount.getDevices(), secondaryAccount.getDevices())) {
return Optional.of("devices");
}
if (databaseAccount.getVersion() != dynamoAccount.getVersion()) {
if (primaryAccount.getVersion() != secondaryAccount.getVersion()) {
return Optional.of("version");
}
if (databaseAccount.getMasterDevice().isPresent() && dynamoAccount.getMasterDevice().isPresent()) {
if (Math.abs(databaseAccount.getMasterDevice().get().getPushTimestamp() -
dynamoAccount.getMasterDevice().get().getPushTimestamp()) > 60 * 1_000L) {
if (primaryAccount.getMasterDevice().isPresent() && secondaryAccount.getMasterDevice().isPresent()) {
if (Math.abs(primaryAccount.getMasterDevice().get().getPushTimestamp() -
secondaryAccount.getMasterDevice().get().getPushTimestamp()) > 60 * 1_000L) {
// These are generally few milliseconds off, because the setter uses System.currentTimeMillis() internally,
// but we can be more relaxed
return Optional.of("masterDevicePushTimestamp");
}
}
if (!serializedEquals(databaseAccount, dynamoAccount)) {
if (!serializedEquals(primaryAccount, secondaryAccount)) {
return Optional.of("serialization");
}
@@ -698,7 +771,8 @@ public class AccountsManager {
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private <T> void runSafelyAndRecordMetrics(Callable<T> callable, Optional<UUID> maybeUuid, final T databaseResult, final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action) {
private <T> void runSafelyAndRecordMetrics(Callable<T> callable, Optional<UUID> maybeUuid, final T primaryResult,
final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action) {
if (maybeUuid.isPresent()) {
// the only time we dont have a UUID is in getByNumber, which is sufficiently low volume to not be a concern, and
@@ -712,8 +786,8 @@ public class AccountsManager {
try {
final T dynamoResult = callable.call();
compare(databaseResult, dynamoResult, mismatchClassifier, action, maybeUuid);
final T secondaryResult = callable.call();
compare(primaryResult, secondaryResult, mismatchClassifier, action, maybeUuid);
} catch (final Exception e) {
logger.error("Error running " + action + " in Dynamo", e);
@@ -723,15 +797,17 @@ public class AccountsManager {
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private <T> void compare(final T databaseResult, final T dynamoResult, final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action, final Optional<UUID> maybeUUid) {
private <T> void compare(final T primaryResult, final T secondaryResult,
final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action,
final Optional<UUID> maybeUUid) {
DYNAMO_MIGRATION_COMPARISON_COUNTER.increment();
mismatchClassifier.apply(databaseResult, dynamoResult)
mismatchClassifier.apply(primaryResult, secondaryResult)
.ifPresent(mismatchType -> {
final String mismatchDescription = action + ":" + mismatchType;
Metrics.counter(DYNAMO_MIGRATION_MISMATCH_COUNTER_NAME,
"mismatchType", mismatchDescription)
"mismatchType", mismatchDescription)
.increment();
maybeUUid.ifPresent(uuid -> {
@@ -762,10 +838,10 @@ public class AccountsManager {
}
private boolean serializedEquals(final Object database, final Object dynamo) throws JsonProcessingException {
final byte[] databaseSerialized = migrationComparisonMapper.writeValueAsBytes(database);
final byte[] dynamoSerialized = migrationComparisonMapper.writeValueAsBytes(dynamo);
final int serializeCompare = Arrays.compare(databaseSerialized, dynamoSerialized);
private boolean serializedEquals(final Object primary, final Object secondary) throws JsonProcessingException {
final byte[] primarySerialized = migrationComparisonMapper.writeValueAsBytes(primary);
final byte[] secondarySerialized = migrationComparisonMapper.writeValueAsBytes(secondary);
final int serializeCompare = Arrays.compare(primarySerialized, secondarySerialized);
return serializeCompare == 0;
}

View File

@@ -129,8 +129,13 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.limit(numberOfMessagesToFetch)
.build();
List<OutgoingMessageEntity> messageEntities = new ArrayList<>(numberOfMessagesToFetch);
for (Map<String, AttributeValue> message : db().query(queryRequest).items()) {
for (Map<String, AttributeValue> message : db().queryPaginator(queryRequest).items()) {
messageEntities.add(convertItemToOutgoingMessageEntity(message));
if (messageEntities.size() == numberOfMessagesToFetch) {
// queryPaginator() uses limit() as the page size, not as an absolute limit
// …but a page might be smaller than limit, because a page is capped at 1 MB
break;
}
}
return messageEntities;
});
@@ -160,7 +165,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
@Nonnull
private Optional<OutgoingMessageEntity> deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(AttributeValue partitionKey, QueryRequest queryRequest) {
Optional<OutgoingMessageEntity> result = Optional.empty();
for (Map<String, AttributeValue> item : db().query(queryRequest).items()) {
for (Map<String, AttributeValue> item : db().queryPaginator(queryRequest).items()) {
final byte[] rangeKeyValue = item.get(KEY_SORT).b().asByteArray();
DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder()
.tableName(tableName)
@@ -225,7 +230,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
}
private void deleteRowsMatchingQuery(AttributeValue partitionKey, QueryRequest querySpec) {
writeInBatches(db().query(querySpec).items(), (itemBatch) -> deleteItems(partitionKey, itemBatch));
writeInBatches(db().queryPaginator(querySpec).items(), itemBatch -> deleteItems(partitionKey, itemBatch));
}
private void deleteItems(AttributeValue partitionKey, List<Map<String, AttributeValue>> items) {

View File

@@ -73,6 +73,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class, "initialQueueLength");
private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue");
private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain");
private static final String QUEUE_DRAIN_RETRY_COUNTER_NAME = name(WebSocketConnection.class, "queueDrainRetry");
private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement");
private static final String NON_SUCCESS_RESPONSE_COUNTER_NAME = name(WebSocketConnection.class, "clientNonSuccessResponse");
private static final String STATUS_CODE_TAG = "status";
@@ -206,11 +207,13 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
if (!message.hasSource()) return;
try {
receiptSender.sendReceipt(auth, message.getSource(), message.getTimestamp());
receiptSender.sendReceipt(auth, UUID.fromString(message.getSourceUuid()), message.getTimestamp());
} catch (NoSuchUserException e) {
logger.info("No longer registered " + e.getMessage());
logger.info("No longer registered: {}", e.getMessage());
} catch (WebApplicationException e) {
logger.warn("Bad federated response for receipt: " + e.getResponse().getStatus());
logger.warn("Bad federated response for receipt: {}", e.getResponse().getStatus());
} catch (IllegalArgumentException e) {
logger.error("Could not parse UUID: {}", message.getSourceUuid());
}
}
@@ -255,9 +258,15 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
processStoredMessages();
}
} else {
logger.debug("Failed to clear queue", cause);
if (consecutiveRetries.incrementAndGet() > MAX_CONSECUTIVE_RETRIES) {
client.close(1011, "Failed to retrieve messages");
} else {
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()));
Metrics.counter(QUEUE_DRAIN_RETRY_COUNTER_NAME, tags).increment();
final long delay = RETRY_DELAY_MILLIS + random.nextInt(RETRY_DELAY_JITTER_MILLIS);
retryFuture.set(retrySchedulingExecutor.schedule(this::processStoredMessages, delay, TimeUnit.MILLISECONDS));
}

View File

@@ -85,14 +85,14 @@ import org.whispersystems.websocket.messages.protobuf.SubProtocol;
import org.whispersystems.websocket.session.WebSocketSessionContextValueFactoryProvider;
@ExtendWith(DropwizardExtensionsSupport.class)
class AuthEnablementRequestEventListenerTest {
class AuthEnablementRefreshRequirementProviderTest {
private final ApplicationEventListener applicationEventListener = mock(ApplicationEventListener.class);
private Account account = new Account();
private final Account account = new Account();
private Device authenticatedDevice = DevicesHelper.createDevice(1L);
private Supplier<Optional<TestPrincipal>> principalSupplier = () -> Optional.of(
private final Supplier<Optional<TestPrincipal>> principalSupplier = () -> Optional.of(
new TestPrincipal("test", account, authenticatedDevice));
private final ResourceExtension resources = ResourceExtension.builder()
@@ -109,12 +109,15 @@ class AuthEnablementRequestEventListenerTest {
private ClientPresenceManager clientPresenceManager;
private AuthEnablementRequestEventListener listener;
private AuthEnablementRefreshRequirementProvider provider;
@BeforeEach
void setup() {
clientPresenceManager = mock(ClientPresenceManager.class);
listener = new AuthEnablementRequestEventListener(clientPresenceManager);
provider = new AuthEnablementRefreshRequirementProvider();
final WebsocketRefreshRequestEventListener listener =
new WebsocketRefreshRequestEventListener(clientPresenceManager, provider);
when(applicationEventListener.onRequest(any())).thenReturn(listener);
final UUID uuid = UUID.randomUUID();
@@ -146,7 +149,7 @@ class AuthEnablementRequestEventListenerTest {
devices.add(device);
});
final Map<Long, Boolean> devicesEnabled = listener.buildDevicesEnabledMap(account);
final Map<Long, Boolean> devicesEnabled = provider.buildDevicesEnabledMap(account);
assertEquals(4, devicesEnabled.size());
@@ -372,7 +375,7 @@ class AuthEnablementRequestEventListenerTest {
}
@ParameterizedTest
@MethodSource("org.whispersystems.textsecuregcm.auth.AuthEnablementRequestEventListenerTest#testAccountEnabledChanged")
@MethodSource("org.whispersystems.textsecuregcm.auth.AuthEnablementRefreshRequirementProviderTest#testAccountEnabledChanged")
void testAccountEnabledChangedWebSocket(final long authenticatedDeviceId, final boolean initialEnabled,
final boolean finalEnabled) throws Exception {

View File

@@ -0,0 +1,107 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.auth;
import org.glassfish.jersey.server.ContainerRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Pair;
import javax.annotation.Nullable;
import javax.ws.rs.core.SecurityContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class PhoneNumberChangeRefreshRequirementProviderTest {
private PhoneNumberChangeRefreshRequirementProvider provider;
private Account account;
private ContainerRequest request;
private static final UUID ACCOUNT_UUID = UUID.randomUUID();
private static final String NUMBER = "+18005551234";
private static final String CHANGED_NUMBER = "+18005554321";
@BeforeEach
void setUp() {
provider = new PhoneNumberChangeRefreshRequirementProvider();
account = mock(Account.class);
final Device device = mock(Device.class);
when(account.getUuid()).thenReturn(ACCOUNT_UUID);
when(account.getNumber()).thenReturn(NUMBER);
when(account.getDevices()).thenReturn(Set.of(device));
when(device.getId()).thenReturn(Device.MASTER_ID);
request = mock(ContainerRequest.class);
final Map<String, Object> requestProperties = new HashMap<>();
doAnswer(invocation -> {
requestProperties.put(invocation.getArgument(0, String.class), invocation.getArgument(1));
return null;
}).when(request).setProperty(anyString(), any());
when(request.getProperty(anyString())).thenAnswer(
invocation -> requestProperties.get(invocation.getArgument(0, String.class)));
}
@Test
void handleRequestNoChange() {
setAuthenticatedAccount(request, account);
provider.handleRequestFiltered(request);
assertEquals(Collections.emptyList(), provider.handleRequestFinished(request));
}
@Test
void handleRequestNumberChange() {
setAuthenticatedAccount(request, account);
provider.handleRequestFiltered(request);
when(account.getNumber()).thenReturn(CHANGED_NUMBER);
assertEquals(List.of(new Pair<>(ACCOUNT_UUID, Device.MASTER_ID)), provider.handleRequestFinished(request));
}
@Test
void handleRequestNoAuthenticatedAccount() {
final ContainerRequest request = mock(ContainerRequest.class);
setAuthenticatedAccount(request, null);
provider.handleRequestFiltered(request);
assertEquals(Collections.emptyList(), provider.handleRequestFinished(request));
}
private void setAuthenticatedAccount(final ContainerRequest mockRequest, @Nullable final Account account) {
final SecurityContext securityContext = mock(SecurityContext.class);
when(mockRequest.getSecurityContext()).thenReturn(securityContext);
if (account != null) {
final AuthenticatedAccount authenticatedAccount = mock(AuthenticatedAccount.class);
when(securityContext.getUserPrincipal()).thenReturn(authenticatedAccount);
when(authenticatedAccount.getAccount()).thenReturn(account);
} else {
when(securityContext.getUserPrincipal()).thenReturn(null);
}
}
}

View File

@@ -51,7 +51,7 @@ public class ConfiguredProfileBadgeConverterTest {
when(clock.instant()).thenReturn(Instant.ofEpochSecond(42));
}
private static String nameFor(int i) {
private static String idFor(int i) {
return "Badge-" + i;
}
@@ -63,16 +63,16 @@ public class ConfiguredProfileBadgeConverterTest {
}
}
private static String rbNameFor(int i) {
private static String nameFor(int i) {
return "TRANSLATED NAME " + i;
}
private static String rbDesriptionFor(int i) {
private static String desriptionFor(int i) {
return "TRANSLATED DESCRIPTION " + i;
}
private static BadgeConfiguration newBadge(int i) {
return new BadgeConfiguration(nameFor(i), imageUrlFor(i));
return new BadgeConfiguration(idFor(i), imageUrlFor(i), "other");
}
private BadgesConfiguration createBadges(int count) {
@@ -80,8 +80,8 @@ public class ConfiguredProfileBadgeConverterTest {
Object[][] objects = new Object[count * 2][2];
for (int i = 0; i < count; i++) {
badges.add(newBadge(i));
objects[(i * 2)] = new Object[]{nameFor(i) + "_name", rbNameFor(i)};
objects[(i * 2) + 1] = new Object[]{nameFor(i) + "_description", rbDesriptionFor(i)};
objects[(i * 2)] = new Object[]{idFor(i) + "_name", nameFor(i)};
objects[(i * 2) + 1] = new Object[]{idFor(i) + "_description", desriptionFor(i)};
}
resourceBundle = new ListResourceBundle() {
@Override
@@ -89,12 +89,12 @@ public class ConfiguredProfileBadgeConverterTest {
return objects;
}
};
return new BadgesConfiguration(badges);
return new BadgesConfiguration(badges, List.of());
}
private BadgeConfiguration getBadge(BadgesConfiguration badgesConfiguration, int i) {
return badgesConfiguration.getBadges().stream()
.filter(badgeConfiguration -> nameFor(i).equals(badgeConfiguration.getName()))
.filter(badgeConfiguration -> idFor(i).equals(badgeConfiguration.getId()))
.findFirst().orElse(null);
}
@@ -137,14 +137,14 @@ public class ConfiguredProfileBadgeConverterTest {
Instant expired = Instant.ofEpochSecond(41);
Instant notExpired = Instant.ofEpochSecond(43);
return Stream.of(
arguments(nameFor(0), expired, false, null),
arguments(nameFor(0), notExpired, false, null),
arguments(nameFor(0), expired, true, null),
arguments(nameFor(0), notExpired, true, new Badge(imageUrlFor(0), rbNameFor(0), rbDesriptionFor(0))),
arguments(nameFor(1), expired, false, null),
arguments(nameFor(1), notExpired, false, null),
arguments(nameFor(1), expired, true, null),
arguments(nameFor(1), notExpired, true, null)
arguments(idFor(0), expired, false, null),
arguments(idFor(0), notExpired, false, null),
arguments(idFor(0), expired, true, null),
arguments(idFor(0), notExpired, true, new Badge(idFor(0), "other", imageUrlFor(0), nameFor(0), desriptionFor(0))),
arguments(idFor(1), expired, false, null),
arguments(idFor(1), notExpired, false, null),
arguments(idFor(1), expired, true, null),
arguments(idFor(1), notExpired, true, null)
);
}
@@ -161,7 +161,7 @@ public class ConfiguredProfileBadgeConverterTest {
ArgumentCaptor<Control> controlArgumentCaptor = setupResourceBundle(enGb);
badgeConverter.convert(List.of(enGb, en, esUs),
List.of(new AccountBadge(nameFor(0), Instant.ofEpochSecond(43), true)));
List.of(new AccountBadge(idFor(0), Instant.ofEpochSecond(43), true)));
Control control = controlArgumentCaptor.getValue();
assertThatNullPointerException().isThrownBy(() -> control.getFormats(null));
@@ -186,7 +186,7 @@ public class ConfiguredProfileBadgeConverterTest {
// this should always terminate at the system default locale since the development defined bundle should get
// returned at that point anyhow
badgeConverter.convert(List.of(enGb, Locale.getDefault(), en, esUs),
List.of(new AccountBadge(nameFor(0), Instant.ofEpochSecond(43), true)));
List.of(new AccountBadge(idFor(0), Instant.ofEpochSecond(43), true)));
Control control2 = controlArgumentCaptor.getValue();
assertThat(control2.getFallbackLocale(ConfiguredProfileBadgeConverter.BASE_NAME, enGb)).isEqualTo(

View File

@@ -5,9 +5,6 @@
package org.whispersystems.textsecuregcm.storage;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -17,7 +14,6 @@ import com.opentable.db.postgres.embedded.LiquibasePreparer;
import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension;
import com.opentable.db.postgres.junit5.PreparedDbExtension;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
@@ -25,12 +21,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.AccountAttributes;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
@@ -274,46 +268,4 @@ class AccountsDynamoDbMigrationCrawlerIntegrationTest {
ACCOUNTS_DYNAMODB_EXTENSION.getDynamoDbClient().createTable(createMigrationRetryAccountsTableRequest);
}
@Test
void testReregistration() throws Exception {
final String e164 = "+18001111234";
final UUID uuid = accountsManager.create(e164, "qefiv132oin4", "OWT", new AccountAttributes()).getUuid();
assertEquals(1, getAllPostgresAccounts().size());
assertTrue(getAllDynamoAccounts().isEmpty());
accountMigrationConfiguration.setReadEnabled(true);
accountMigrationConfiguration.setDeleteEnabled(true);
accountMigrationConfiguration.setWriteEnabled(true);
accountsManager.create(e164, "qefiv132oin4", "OWT", new AccountAttributes());
assertEquals(1, getAllPostgresAccounts().size());
assertTrue(getAllDynamoAccounts().isEmpty());
assertEquals(uuid, accountsManager.get(e164).orElseThrow().getUuid());
accountMigrationConfiguration.setBackgroundMigrationExecutorThreads(5);
accountDatabaseCrawler.doPeriodicWork();
assertEquals(1, getAllDynamoAccounts().size());
final Optional<Account> dbAccount = accounts.get(e164);
final Optional<Account> dynamoAccount = accountsDynamoDb.get(e164);
assertAll(() -> assertTrue(dbAccount.isPresent()),
() -> assertTrue(dynamoAccount.isPresent()),
() -> assertEquals(Optional.empty(), accountsManager.compareAccounts(dbAccount, dynamoAccount)));
}
private List<Account> getAllPostgresAccounts() {
return accounts.getAllFrom(100).getAccounts();
}
private List<Account> getAllDynamoAccounts() {
return accountsDynamoDb.getAllFromStart(100, 1000).getAccounts();
}
}

View File

@@ -25,6 +25,7 @@ import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.LocalSecondaryIndex;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback {
@@ -45,6 +46,7 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
private final List<AttributeDefinition> attributeDefinitions;
private final List<GlobalSecondaryIndex> globalSecondaryIndexes;
private final List<LocalSecondaryIndex> localSecondaryIndexes;
private final long readCapacityUnits;
private final long writeCapacityUnits;
@@ -53,12 +55,16 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
private DynamoDbAsyncClient dynamoAsyncDB2;
private AmazonDynamoDB legacyDynamoClient;
private DynamoDbExtension(String tableName, String hashKey, String rangeKey, List<AttributeDefinition> attributeDefinitions, List<GlobalSecondaryIndex> globalSecondaryIndexes, long readCapacityUnits,
private DynamoDbExtension(String tableName, String hashKey, String rangeKey,
List<AttributeDefinition> attributeDefinitions, List<GlobalSecondaryIndex> globalSecondaryIndexes,
final List<LocalSecondaryIndex> localSecondaryIndexes,
long readCapacityUnits,
long writeCapacityUnits) {
this.tableName = tableName;
this.hashKeyName = hashKey;
this.rangeKeyName = rangeKey;
this.localSecondaryIndexes = localSecondaryIndexes;
this.readCapacityUnits = readCapacityUnits;
this.writeCapacityUnits = writeCapacityUnits;
@@ -108,6 +114,7 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
.keySchema(keySchemaElements)
.attributeDefinitions(attributeDefinitions.isEmpty() ? null : attributeDefinitions)
.globalSecondaryIndexes(globalSecondaryIndexes.isEmpty() ? null : globalSecondaryIndexes)
.localSecondaryIndexes(localSecondaryIndexes.isEmpty() ? null : localSecondaryIndexes)
.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits(readCapacityUnits)
.writeCapacityUnits(writeCapacityUnits)
@@ -150,7 +157,8 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
.build();
}
static class DynamoDbExtensionBuilder {
public static class DynamoDbExtensionBuilder {
private String tableName = DEFAULT_TABLE_NAME;
private String hashKey;
@@ -158,6 +166,7 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
private List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
private List<GlobalSecondaryIndex> globalSecondaryIndexes = new ArrayList<>();
private List<LocalSecondaryIndex> localSecondaryIndexes = new ArrayList<>();
private long readCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.readCapacityUnits();
private long writeCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.writeCapacityUnits();
@@ -166,22 +175,22 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
}
DynamoDbExtensionBuilder tableName(String databaseName) {
public DynamoDbExtensionBuilder tableName(String databaseName) {
this.tableName = databaseName;
return this;
}
DynamoDbExtensionBuilder hashKey(String hashKey) {
public DynamoDbExtensionBuilder hashKey(String hashKey) {
this.hashKey = hashKey;
return this;
}
DynamoDbExtensionBuilder rangeKey(String rangeKey) {
public DynamoDbExtensionBuilder rangeKey(String rangeKey) {
this.rangeKey = rangeKey;
return this;
}
DynamoDbExtensionBuilder attributeDefinition(AttributeDefinition attributeDefinition) {
public DynamoDbExtensionBuilder attributeDefinition(AttributeDefinition attributeDefinition) {
attributeDefinitions.add(attributeDefinition);
return this;
}
@@ -191,9 +200,14 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
return this;
}
DynamoDbExtension build() {
public DynamoDbExtensionBuilder localSecondaryIndex(LocalSecondaryIndex index) {
localSecondaryIndexes.add(index);
return this;
}
public DynamoDbExtension build() {
return new DynamoDbExtension(tableName, hashKey, rangeKey,
attributeDefinitions, globalSecondaryIndexes, readCapacityUnits, writeCapacityUnits);
attributeDefinitions, globalSecondaryIndexes, localSecondaryIndexes, readCapacityUnits, writeCapacityUnits);
}
}

View File

@@ -5,7 +5,8 @@
package org.whispersystems.textsecuregcm.storage;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTimeout;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -24,158 +25,159 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope.Type;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbRule;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
class MessagePersisterIntegrationTest {
@Rule
public MessagesDynamoDbRule messagesDynamoDbRule = new MessagesDynamoDbRule();
@RegisterExtension
static DynamoDbExtension dynamoDbExtension = MessagesDynamoDbExtension.build();
private ExecutorService notificationExecutorService;
private MessagesCache messagesCache;
private MessagesManager messagesManager;
private MessagePersister messagePersister;
private Account account;
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private static final Duration PERSIST_DELAY = Duration.ofMinutes(10);
private ExecutorService notificationExecutorService;
private MessagesCache messagesCache;
private MessagesManager messagesManager;
private MessagePersister messagePersister;
private Account account;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
private static final Duration PERSIST_DELAY = Duration.ofMinutes(10);
getRedisCluster().useCluster(connection -> {
connection.sync().flushall();
connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz");
});
@BeforeEach
void setUp() throws Exception {
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> {
connection.sync().flushall();
connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz");
});
final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDbClient(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));
final AccountsManager accountsManager = mock(AccountsManager.class);
final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(),
MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(14));
final AccountsManager accountsManager = mock(AccountsManager.class);
final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class), mock(ReportMessageManager.class));
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, PERSIST_DELAY);
notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), notificationExecutorService);
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class),
mock(ReportMessageManager.class));
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY);
account = mock(Account.class);
account = mock(Account.class);
final UUID accountUuid = UUID.randomUUID();
final UUID accountUuid = UUID.randomUUID();
when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(accountUuid);
when(accountsManager.get(accountUuid)).thenReturn(Optional.of(account));
when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(accountUuid);
when(accountsManager.get(accountUuid)).thenReturn(Optional.of(account));
when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
messagesCache.start();
}
messagesCache.start();
}
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
@AfterEach
void tearDown() throws Exception {
notificationExecutorService.shutdown();
notificationExecutorService.awaitTermination(15, TimeUnit.SECONDS);
}
notificationExecutorService.shutdown();
notificationExecutorService.awaitTermination(15, TimeUnit.SECONDS);
}
@Test
void testScheduledPersistMessages() {
@Test(timeout = 15_000)
public void testScheduledPersistMessages() throws Exception {
final int messageCount = 377;
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(messageCount);
final Instant now = Instant.now();
final int messageCount = 377;
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(messageCount);
final Instant now = Instant.now();
for (int i = 0; i < messageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
final long timestamp = now.minus(PERSIST_DELAY.multipliedBy(2)).toEpochMilli() + i;
assertTimeout(Duration.ofSeconds(15), () -> {
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, timestamp);
for (int i = 0; i < messageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
final long timestamp = now.minus(PERSIST_DELAY.multipliedBy(2)).toEpochMilli() + i;
messagesCache.insert(messageGuid, account.getUuid(), 1, message);
expectedMessages.add(message);
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, timestamp);
messagesCache.insert(messageGuid, account.getUuid(), 1, message);
expectedMessages.add(message);
}
REDIS_CLUSTER_EXTENSION.getRedisCluster()
.useCluster(connection -> connection.sync().set(MessagesCache.NEXT_SLOT_TO_PERSIST_KEY,
String.valueOf(SlotHash.getSlot(MessagesCache.getMessageQueueKey(account.getUuid(), 1)) - 1)));
final AtomicBoolean messagesPersisted = new AtomicBoolean(false);
messagesManager.addMessageAvailabilityListener(account.getUuid(), 1, new MessageAvailabilityListener() {
@Override
public void handleNewMessagesAvailable() {
}
getRedisCluster().useCluster(connection -> connection.sync().set(MessagesCache.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(SlotHash.getSlot(MessagesCache.getMessageQueueKey(account.getUuid(), 1)) - 1)));
final AtomicBoolean messagesPersisted = new AtomicBoolean(false);
messagesManager.addMessageAvailabilityListener(account.getUuid(), 1, new MessageAvailabilityListener() {
@Override
public void handleNewMessagesAvailable() {
}
@Override
public void handleNewEphemeralMessageAvailable() {
}
@Override
public void handleMessagesPersisted() {
synchronized (messagesPersisted) {
messagesPersisted.set(true);
messagesPersisted.notifyAll();
}
}
});
messagePersister.start();
synchronized (messagesPersisted) {
while (!messagesPersisted.get()) {
messagesPersisted.wait();
}
@Override
public void handleNewEphemeralMessageAvailable() {
}
messagePersister.stop();
@Override
public void handleMessagesPersisted() {
synchronized (messagesPersisted) {
messagesPersisted.set(true);
messagesPersisted.notifyAll();
}
}
});
final List<MessageProtos.Envelope> persistedMessages = new ArrayList<>(messageCount);
messagePersister.start();
DynamoDbClient dynamoDB = messagesDynamoDbRule.getDynamoDbClient();
synchronized (messagesPersisted) {
while (!messagesPersisted.get()) {
messagesPersisted.wait();
}
}
messagePersister.stop();
final List<MessageProtos.Envelope> persistedMessages = new ArrayList<>(messageCount);
DynamoDbClient dynamoDB = dynamoDbExtension.getDynamoDbClient();
for (Map<String, AttributeValue> item : dynamoDB
.scan(ScanRequest.builder().tableName(MessagesDynamoDbRule.TABLE_NAME).build()).items()) {
.scan(ScanRequest.builder().tableName(MessagesDynamoDbExtension.TABLE_NAME).build()).items()) {
persistedMessages.add(MessageProtos.Envelope.newBuilder()
.setServerGuid(AttributeValues.getUUID(item, "U", null).toString())
.setType(MessageProtos.Envelope.Type.valueOf(AttributeValues.getInt(item, "T", -1)))
.setType(Type.forNumber(AttributeValues.getInt(item, "T", -1)))
.setTimestamp(AttributeValues.getLong(item, "TS", -1))
.setServerTimestamp(extractServerTimestamp(AttributeValues.getByteArray(item, "S", null)))
.setContent(ByteString.copyFrom(AttributeValues.getByteArray(item, "C", null)))
.build());
}
}
assertEquals(expectedMessages, persistedMessages);
}
assertEquals(expectedMessages, persistedMessages);
});
}
private static UUID convertBinaryToUuid(byte[] bytes) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
long msb = bb.getLong();
long lsb = bb.getLong();
return new UUID(msb, lsb);
}
private static long extractServerTimestamp(byte[] bytes) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
bb.getLong();
return bb.getLong();
}
private static long extractServerTimestamp(byte[] bytes) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
bb.getLong();
return bb.getLong();
}
private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final long timestamp) {
return MessageProtos.Envelope.newBuilder()
.setTimestamp(timestamp)
.setServerTimestamp(timestamp)
.setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256)))
.setType(MessageProtos.Envelope.Type.CIPHERTEXT)
.setServerGuid(messageGuid.toString())
.build();
}
private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final long timestamp) {
return MessageProtos.Envelope.newBuilder()
.setTimestamp(timestamp)
.setServerTimestamp(timestamp)
.setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256)))
.setType(MessageProtos.Envelope.Type.CIPHERTEXT)
.setServerGuid(messageGuid.toString())
.build();
}
}

View File

@@ -291,7 +291,7 @@ class MessageControllerTest {
assertThat("Good Response", response.getStatus(), is(equalTo(200)));
verify(messageSender, never()).sendMessage(any(), any(), any(), anyBoolean());
verify(receiptSender).sendReceipt(any(), eq(AuthHelper.VALID_NUMBER), anyLong());
verify(receiptSender).sendReceipt(any(), eq(AuthHelper.VALID_UUID), anyLong());
}
@ParameterizedTest
@@ -575,7 +575,7 @@ class MessageControllerTest {
.delete();
assertThat("Good Response Code", response.getStatus(), is(equalTo(204)));
verify(receiptSender).sendReceipt(any(AuthenticatedAccount.class), eq("+14152222222"), eq(timestamp));
verify(receiptSender).sendReceipt(any(AuthenticatedAccount.class), eq(sourceUuid), eq(timestamp));
response = resources.getJerseyTest()
.target(String.format("/v1/messages/uuid/%s", uuid2))

View File

@@ -21,6 +21,8 @@ import com.google.common.collect.ImmutableSet;
import io.dropwizard.auth.PolymorphicAuthValueFactoryProvider;
import io.dropwizard.testing.junit5.DropwizardExtensionsSupport;
import io.dropwizard.testing.junit5.ResourceExtension;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -29,6 +31,7 @@ import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.RandomStringUtils;
import org.assertj.core.api.Condition;
import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -45,6 +48,7 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfigurati
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicPaymentsConfiguration;
import org.whispersystems.textsecuregcm.controllers.ProfileController;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.entities.Badge;
import org.whispersystems.textsecuregcm.entities.CreateProfileRequest;
import org.whispersystems.textsecuregcm.entities.Profile;
import org.whispersystems.textsecuregcm.entities.ProfileAvatarUploadAttributes;
@@ -97,7 +101,15 @@ class ProfileControllerTest {
profilesManager,
usernamesManager,
dynamicConfigurationManager,
(acceptableLanguages, accountBadges) -> List.of(), // TODO: Test with some badges.
(acceptableLanguages, accountBadges) -> {
try {
return List.of(
new Badge("TEST1", "other", new URL("https://example.com/badge/1"), "Test Badge", "This badge is in unit tests.")
);
} catch (MalformedURLException e) {
throw new AssertionError(e);
}
},
s3client,
postPolicyGenerator,
policySigner,
@@ -184,6 +196,8 @@ class ProfileControllerTest {
assertThat(profile.getName()).isEqualTo("baz");
assertThat(profile.getAvatar()).isEqualTo("profiles/bang");
assertThat(profile.getUsername()).isEqualTo("n00bkiller");
assertThat(profile.getBadges()).hasSize(1).element(0).has(new Condition<>(
badge -> "Test Badge".equals(badge.getName()), "has badge with expected name"));
verify(accountsManager).get(AuthHelper.VALID_UUID_TWO);
verify(usernamesManager, times(1)).get(eq(AuthHelper.VALID_UUID_TWO));
@@ -203,6 +217,8 @@ class ProfileControllerTest {
assertThat(profile.getAvatar()).isEqualTo("profiles/bang");
assertThat(profile.getUsername()).isEqualTo("n00bkiller");
assertThat(profile.getUuid()).isEqualTo(AuthHelper.VALID_UUID_TWO);
assertThat(profile.getBadges()).hasSize(1).element(0).has(new Condition<>(
badge -> "Test Badge".equals(badge.getName()), "has badge with expected name"));
verify(accountsManager, times(1)).get(eq(AuthHelper.VALID_UUID_TWO));
verify(usernamesManager, times(1)).get(eq("n00bkiller"));
@@ -563,6 +579,8 @@ class ProfileControllerTest {
assertThat(profile.getCapabilities().isGv1Migration()).isFalse();
assertThat(profile.getUsername()).isEqualTo("n00bkiller");
assertThat(profile.getUuid()).isNull();
assertThat(profile.getBadges()).hasSize(1).element(0).has(new Condition<>(
badge -> "Test Badge".equals(badge.getName()), "has badge with expected name"));
verify(accountsManager, times(1)).get(eq(AuthHelper.VALID_UUID_TWO));
verify(usernamesManager, times(1)).get(eq(AuthHelper.VALID_UUID_TWO));

View File

@@ -5,21 +5,38 @@
package org.whispersystems.textsecuregcm.tests.storage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.storage.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
class AccountDatabaseCrawlerTest {
@@ -132,7 +149,43 @@ class AccountDatabaseCrawlerTest {
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyZeroInteractions(account1);
verifyNoInteractions(account1);
verifyNoMoreInteractions(account2);
verifyNoMoreInteractions(accounts);
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(cache);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCrawlChunk_useDynamoDedicatedMigrationCrawler(final boolean dedicatedMigrationCrawler) throws Exception {
crawler.setDedicatedDynamoMigrationCrawler(dedicatedMigrationCrawler);
when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(true);
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(dedicatedMigrationCrawler ? 1 : 0)).getLastUuid();
verify(cache, times(dedicatedMigrationCrawler ? 0 : 1)).getLastUuidDynamo();
if (dedicatedMigrationCrawler) {
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
} else {
verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE));
}
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(cache, times(dedicatedMigrationCrawler ? 1 : 0)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(dedicatedMigrationCrawler ? 0 : 1)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyNoInteractions(account1);
verifyNoMoreInteractions(account2);
verifyNoMoreInteractions(accounts);

View File

@@ -16,6 +16,8 @@ import static org.whispersystems.textsecuregcm.tests.util.DevicesHelper.createDe
import static org.whispersystems.textsecuregcm.tests.util.DevicesHelper.setEnabled;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -24,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountBadge;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.Device.DeviceCapabilities;
@@ -342,4 +345,37 @@ class AccountTest {
assertThat(account.getNextDeviceId()).isEqualTo(2L);
}
@Test
void addAndRemoveBadges() {
final Account account = new Account("+14151234567", UUID.randomUUID(), Set.of(createDevice(Device.MASTER_ID)), new byte[0]);
final Clock clock = mock(Clock.class);
when(clock.instant()).thenReturn(Instant.ofEpochSecond(40));
account.addBadge(clock, new AccountBadge("foo", Instant.ofEpochSecond(42), false));
account.addBadge(clock, new AccountBadge("bar", Instant.ofEpochSecond(44), true));
account.addBadge(clock, new AccountBadge("baz", Instant.ofEpochSecond(46), true));
assertThat(account.getBadges()).hasSize(3);
account.removeBadge(clock, "baz");
assertThat(account.getBadges()).hasSize(2);
account.addBadge(clock, new AccountBadge("foo", Instant.ofEpochSecond(50), false));
assertThat(account.getBadges()).hasSize(2).element(0).satisfies(badge -> {
assertThat(badge.getId()).isEqualTo("foo");
assertThat(badge.getExpiration().getEpochSecond()).isEqualTo(50);
assertThat(badge.isVisible()).isFalse();
});
account.addBadge(clock, new AccountBadge("foo", Instant.ofEpochSecond(51), true));
assertThat(account.getBadges()).hasSize(2).element(0).satisfies(badge -> {
assertThat(badge.getId()).isEqualTo("foo");
assertThat(badge.getExpiration().getEpochSecond()).isEqualTo(51);
assertThat(badge.isVisible()).isTrue();
});
}
}

View File

@@ -470,7 +470,7 @@ class AccountsManagerTest {
final UUID uuidA = UUID.randomUUID();
final Account a1 = new Account("+14152222222", uuidA, new HashSet<>(), new byte[16]);
assertEquals(Optional.of("dbMissing"), accountsManager.compareAccounts(Optional.empty(), Optional.of(a1)));
assertEquals(Optional.of("primaryMissing"), accountsManager.compareAccounts(Optional.empty(), Optional.of(a1)));
final Account a2 = new Account("+14152222222", uuidA, new HashSet<>(), new byte[16]);

View File

@@ -13,15 +13,18 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtension;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbRule;
import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension;
class MessagesDynamoDbTest {
public class MessagesDynamoDbTest {
private static final Random random = new Random();
private static final MessageProtos.Envelope MESSAGE1;
private static final MessageProtos.Envelope MESSAGE2;
@@ -61,27 +64,31 @@ public class MessagesDynamoDbTest {
private MessagesDynamoDb messagesDynamoDb;
@ClassRule
public static MessagesDynamoDbRule dynamoDbRule = new MessagesDynamoDbRule();
@Before
public void setup() {
messagesDynamoDb = new MessagesDynamoDb(dynamoDbRule.getDynamoDbClient(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));
@RegisterExtension
static DynamoDbExtension dynamoDbExtension = MessagesDynamoDbExtension.build();
@BeforeEach
void setup() {
messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), MessagesDynamoDbExtension.TABLE_NAME,
Duration.ofDays(14));
}
@Test
public void testServerStart() {
void testServerStart() {
}
@Test
public void testSimpleFetchAfterInsert() {
void testSimpleFetchAfterInsert() {
final UUID destinationUuid = UUID.randomUUID();
final int destinationDeviceId = random.nextInt(255) + 1;
messagesDynamoDb.store(List.of(MESSAGE1, MESSAGE2, MESSAGE3), destinationUuid, destinationDeviceId);
final List<OutgoingMessageEntity> messagesStored = messagesDynamoDb.load(destinationUuid, destinationDeviceId, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE);
final List<OutgoingMessageEntity> messagesStored = messagesDynamoDb.load(destinationUuid, destinationDeviceId,
MessagesDynamoDb.RESULT_SET_CHUNK_SIZE);
assertThat(messagesStored).isNotNull().hasSize(3);
final MessageProtos.Envelope firstMessage = MESSAGE1.getServerGuid().compareTo(MESSAGE3.getServerGuid()) < 0 ? MESSAGE1 : MESSAGE3;
final MessageProtos.Envelope firstMessage =
MESSAGE1.getServerGuid().compareTo(MESSAGE3.getServerGuid()) < 0 ? MESSAGE1 : MESSAGE3;
final MessageProtos.Envelope secondMessage = firstMessage == MESSAGE1 ? MESSAGE3 : MESSAGE1;
assertThat(messagesStored).element(0).satisfies(verify(firstMessage));
assertThat(messagesStored).element(1).satisfies(verify(secondMessage));
@@ -89,61 +96,76 @@ public class MessagesDynamoDbTest {
}
@Test
public void testDeleteForDestination() {
void testDeleteForDestination() {
final UUID destinationUuid = UUID.randomUUID();
final UUID secondDestinationUuid = UUID.randomUUID();
messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, 1);
messagesDynamoDb.store(List.of(MESSAGE2), secondDestinationUuid, 1);
messagesDynamoDb.store(List.of(MESSAGE3), destinationUuid, 2);
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE1));
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE3));
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE2));
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).satisfies(verify(MESSAGE1));
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).satisfies(verify(MESSAGE3));
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).satisfies(verify(MESSAGE2));
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid);
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty();
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty();
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE2));
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).satisfies(verify(MESSAGE2));
}
@Test
public void testDeleteForDestinationDevice() {
void testDeleteForDestinationDevice() {
final UUID destinationUuid = UUID.randomUUID();
final UUID secondDestinationUuid = UUID.randomUUID();
messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, 1);
messagesDynamoDb.store(List.of(MESSAGE2), secondDestinationUuid, 1);
messagesDynamoDb.store(List.of(MESSAGE3), destinationUuid, 2);
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE1));
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE3));
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE2));
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).satisfies(verify(MESSAGE1));
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).satisfies(verify(MESSAGE3));
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).satisfies(verify(MESSAGE2));
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, 2);
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE1));
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).satisfies(verify(MESSAGE1));
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty();
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE2));
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).satisfies(verify(MESSAGE2));
}
@Test
public void testDeleteMessageByDestinationAndGuid() {
void testDeleteMessageByDestinationAndGuid() {
final UUID destinationUuid = UUID.randomUUID();
final UUID secondDestinationUuid = UUID.randomUUID();
messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, 1);
messagesDynamoDb.store(List.of(MESSAGE2), secondDestinationUuid, 1);
messagesDynamoDb.store(List.of(MESSAGE3), destinationUuid, 2);
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE1));
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE3));
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE2));
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).satisfies(verify(MESSAGE1));
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).satisfies(verify(MESSAGE3));
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).satisfies(verify(MESSAGE2));
messagesDynamoDb.deleteMessageByDestinationAndGuid(secondDestinationUuid,
UUID.fromString(MESSAGE2.getServerGuid()));
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE1));
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1).element(0).satisfies(verify(MESSAGE3));
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty();
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).satisfies(verify(MESSAGE1));
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).satisfies(verify(MESSAGE3));
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.isEmpty();
}
private static void verify(OutgoingMessageEntity retrieved, MessageProtos.Envelope inserted) {
@@ -164,6 +186,7 @@ public class MessagesDynamoDbTest {
}
private static final class VerifyMessage implements Consumer<OutgoingMessageEntity> {
private final MessageProtos.Envelope expected;
public VerifyMessage(MessageProtos.Envelope expected) {

View File

@@ -5,42 +5,36 @@
package org.whispersystems.textsecuregcm.tests.util;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtension;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.LocalSecondaryIndex;
import software.amazon.awssdk.services.dynamodb.model.Projection;
import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
public class MessagesDynamoDbRule extends LocalDynamoDbRule {
public class MessagesDynamoDbExtension {
public static final String TABLE_NAME = "Signal_Messages_UnitTest";
@Override
protected void before() throws Throwable {
super.before();
getDynamoDbClient().createTable(CreateTableRequest.builder()
public static DynamoDbExtension build() {
return DynamoDbExtension.builder()
.tableName(TABLE_NAME)
.keySchema(KeySchemaElement.builder().attributeName("H").keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName("S").keyType(KeyType.RANGE).build())
.attributeDefinitions(
AttributeDefinition.builder().attributeName("H").attributeType(ScalarAttributeType.B).build(),
AttributeDefinition.builder().attributeName("S").attributeType(ScalarAttributeType.B).build(),
.hashKey("H")
.rangeKey("S")
.attributeDefinition(
AttributeDefinition.builder().attributeName("H").attributeType(ScalarAttributeType.B).build())
.attributeDefinition(
AttributeDefinition.builder().attributeName("S").attributeType(ScalarAttributeType.B).build())
.attributeDefinition(
AttributeDefinition.builder().attributeName("U").attributeType(ScalarAttributeType.B).build())
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(20L).writeCapacityUnits(20L).build())
.localSecondaryIndexes(LocalSecondaryIndex.builder().indexName("Message_UUID_Index")
.localSecondaryIndex(LocalSecondaryIndex.builder().indexName("Message_UUID_Index")
.keySchema(KeySchemaElement.builder().attributeName("H").keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName("U").keyType(KeyType.RANGE).build())
.projection(Projection.builder().projectionType(ProjectionType.KEYS_ONLY).build())
.build())
.build());
.build();
}
@Override
protected void after() {
super.after();
}
}

View File

@@ -5,9 +5,10 @@
package org.whispersystems.textsecuregcm.websocket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTimeout;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
@@ -34,10 +35,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
@@ -45,195 +46,208 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtension;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbRule;
import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest {
class WebSocketConnectionIntegrationTest {
@Rule
public MessagesDynamoDbRule messagesDynamoDbRule = new MessagesDynamoDbRule();
@RegisterExtension
static DynamoDbExtension dynamoDbExtension = MessagesDynamoDbExtension.build();
private ExecutorService executorService;
private MessagesDynamoDb messagesDynamoDb;
private MessagesCache messagesCache;
private ReportMessageManager reportMessageManager;
private Account account;
private Device device;
private WebSocketClient webSocketClient;
private WebSocketConnection webSocketConnection;
private ScheduledExecutorService retrySchedulingExecutor;
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private long serialTimestamp = System.currentTimeMillis();
private ExecutorService executorService;
private MessagesDynamoDb messagesDynamoDb;
private MessagesCache messagesCache;
private ReportMessageManager reportMessageManager;
private Account account;
private Device device;
private WebSocketClient webSocketClient;
private WebSocketConnection webSocketConnection;
private ScheduledExecutorService retrySchedulingExecutor;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
private long serialTimestamp = System.currentTimeMillis();
executorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), executorService);
messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDbClient(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));
reportMessageManager = mock(ReportMessageManager.class);
account = mock(Account.class);
device = mock(Device.class);
webSocketClient = mock(WebSocketClient.class);
retrySchedulingExecutor = Executors.newSingleThreadScheduledExecutor();
@BeforeEach
void setUp() throws Exception {
when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID());
when(device.getId()).thenReturn(1L);
executorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), executorService);
messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), MessagesDynamoDbExtension.TABLE_NAME,
Duration.ofDays(7));
reportMessageManager = mock(ReportMessageManager.class);
account = mock(Account.class);
device = mock(Device.class);
webSocketClient = mock(WebSocketClient.class);
retrySchedulingExecutor = Executors.newSingleThreadScheduledExecutor();
webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class), reportMessageManager),
new AuthenticatedAccount(() -> new Pair<>(account, device)),
device,
webSocketClient,
retrySchedulingExecutor);
}
when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID());
when(device.getId()).thenReturn(1L);
@After
@Override
public void tearDown() throws Exception {
executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.SECONDS);
webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class), reportMessageManager),
new AuthenticatedAccount(() -> new Pair<>(account, device)),
device,
webSocketClient,
retrySchedulingExecutor);
}
retrySchedulingExecutor.shutdown();
retrySchedulingExecutor.awaitTermination(2, TimeUnit.SECONDS);
@AfterEach
void tearDown() throws Exception {
executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.SECONDS);
super.tearDown();
}
retrySchedulingExecutor.shutdown();
retrySchedulingExecutor.awaitTermination(2, TimeUnit.SECONDS);
}
@Test(timeout = 15_000)
public void testProcessStoredMessages() throws InterruptedException {
final int persistedMessageCount = 207;
final int cachedMessageCount = 173;
@Test
void testProcessStoredMessages() {
final int persistedMessageCount = 207;
final int cachedMessageCount = 173;
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount);
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount);
{
final List<MessageProtos.Envelope> persistedMessages = new ArrayList<>(persistedMessageCount);
assertTimeout(Duration.ofSeconds(15), () -> {
for (int i = 0; i < persistedMessageCount; i++) {
final MessageProtos.Envelope envelope = generateRandomMessage(UUID.randomUUID());
{
final List<MessageProtos.Envelope> persistedMessages = new ArrayList<>(persistedMessageCount);
persistedMessages.add(envelope);
expectedMessages.add(envelope);
}
for (int i = 0; i < persistedMessageCount; i++) {
final MessageProtos.Envelope envelope = generateRandomMessage(UUID.randomUUID());
messagesDynamoDb.store(persistedMessages, account.getUuid(), device.getId());
persistedMessages.add(envelope);
expectedMessages.add(envelope);
}
for (int i = 0; i < cachedMessageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope envelope = generateRandomMessage(messageGuid);
messagesDynamoDb.store(persistedMessages, account.getUuid(), device.getId());
}
messagesCache.insert(messageGuid, account.getUuid(), device.getId(), envelope);
expectedMessages.add(envelope);
}
for (int i = 0; i < cachedMessageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope envelope = generateRandomMessage(messageGuid);
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
final AtomicBoolean queueCleared = new AtomicBoolean(false);
messagesCache.insert(messageGuid, account.getUuid(), device.getId(), envelope);
expectedMessages.add(envelope);
}
when(successResponse.getStatus()).thenReturn(200);
when(webSocketClient.sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), any())).thenReturn(CompletableFuture.completedFuture(successResponse));
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
final AtomicBoolean queueCleared = new AtomicBoolean(false);
when(webSocketClient.sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), anyList(), any())).thenAnswer((Answer<CompletableFuture<WebSocketResponseMessage>>)invocation -> {
when(successResponse.getStatus()).thenReturn(200);
when(webSocketClient.sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), any())).thenReturn(
CompletableFuture.completedFuture(successResponse));
when(webSocketClient.sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), anyList(), any())).thenAnswer(
(Answer<CompletableFuture<WebSocketResponseMessage>>) invocation -> {
synchronized (queueCleared) {
queueCleared.set(true);
queueCleared.notifyAll();
queueCleared.set(true);
queueCleared.notifyAll();
}
return CompletableFuture.completedFuture(successResponse);
});
webSocketConnection.processStoredMessages();
synchronized (queueCleared) {
while (!queueCleared.get()) {
queueCleared.wait();
}
}
@SuppressWarnings("unchecked") final ArgumentCaptor<Optional<byte[]>> messageBodyCaptor = ArgumentCaptor.forClass(
Optional.class);
verify(webSocketClient, times(persistedMessageCount + cachedMessageCount)).sendRequest(eq("PUT"),
eq("/api/v1/message"), anyList(), messageBodyCaptor.capture());
verify(webSocketClient).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), anyList(), eq(Optional.empty()));
final List<MessageProtos.Envelope> sentMessages = new ArrayList<>();
for (final Optional<byte[]> maybeMessageBody : messageBodyCaptor.getAllValues()) {
maybeMessageBody.ifPresent(messageBytes -> {
try {
sentMessages.add(MessageProtos.Envelope.parseFrom(messageBytes));
} catch (final InvalidProtocolBufferException e) {
fail("Could not parse sent message");
}
});
}
webSocketConnection.processStoredMessages();
assertEquals(expectedMessages, sentMessages);
});
}
synchronized (queueCleared) {
while (!queueCleared.get()) {
queueCleared.wait();
}
@Test
void testProcessStoredMessagesClientClosed() {
final int persistedMessageCount = 207;
final int cachedMessageCount = 173;
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount);
assertTimeout(Duration.ofSeconds(15), () -> {
{
final List<MessageProtos.Envelope> persistedMessages = new ArrayList<>(persistedMessageCount);
for (int i = 0; i < persistedMessageCount; i++) {
final MessageProtos.Envelope envelope = generateRandomMessage(UUID.randomUUID());
persistedMessages.add(envelope);
expectedMessages.add(envelope);
}
@SuppressWarnings("unchecked")
final ArgumentCaptor<Optional<byte[]>> messageBodyCaptor = ArgumentCaptor.forClass(Optional.class);
messagesDynamoDb.store(persistedMessages, account.getUuid(), device.getId());
}
verify(webSocketClient, times(persistedMessageCount + cachedMessageCount)).sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), messageBodyCaptor.capture());
verify(webSocketClient).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), anyList(), eq(Optional.empty()));
for (int i = 0; i < cachedMessageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope envelope = generateRandomMessage(messageGuid);
messagesCache.insert(messageGuid, account.getUuid(), device.getId(), envelope);
final List<MessageProtos.Envelope> sentMessages = new ArrayList<>();
expectedMessages.add(envelope);
}
for (final Optional<byte[]> maybeMessageBody : messageBodyCaptor.getAllValues()) {
maybeMessageBody.ifPresent(messageBytes -> {
try {
sentMessages.add(MessageProtos.Envelope.parseFrom(messageBytes));
} catch (final InvalidProtocolBufferException e) {
fail("Could not parse sent message");
}
});
}
when(webSocketClient.sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), any())).thenReturn(
CompletableFuture.failedFuture(new IOException("Connection closed")));
assertEquals(expectedMessages, sentMessages);
}
webSocketConnection.processStoredMessages();
@Test(timeout = 15_000)
public void testProcessStoredMessagesClientClosed() {
final int persistedMessageCount = 207;
final int cachedMessageCount = 173;
//noinspection unchecked
ArgumentCaptor<Optional<byte[]>> messageBodyCaptor = ArgumentCaptor.forClass(Optional.class);
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount);
{
final List<MessageProtos.Envelope> persistedMessages = new ArrayList<>(persistedMessageCount);
for (int i = 0; i < persistedMessageCount; i++) {
final MessageProtos.Envelope envelope = generateRandomMessage(UUID.randomUUID());
persistedMessages.add(envelope);
expectedMessages.add(envelope);
}
messagesDynamoDb.store(persistedMessages, account.getUuid(), device.getId());
}
for (int i = 0; i < cachedMessageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope envelope = generateRandomMessage(messageGuid);
messagesCache.insert(messageGuid, account.getUuid(), device.getId(), envelope);
expectedMessages.add(envelope);
}
when(webSocketClient.sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), any())).thenReturn(CompletableFuture.failedFuture(new IOException("Connection closed")));
webSocketConnection.processStoredMessages();
//noinspection unchecked
ArgumentCaptor<Optional<byte[]>> messageBodyCaptor = ArgumentCaptor.forClass(Optional.class);
verify(webSocketClient, atMost(persistedMessageCount + cachedMessageCount)).sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), messageBodyCaptor.capture());
verify(webSocketClient, never()).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), anyList(), eq(Optional.empty()));
verify(webSocketClient, atMost(persistedMessageCount + cachedMessageCount)).sendRequest(eq("PUT"),
eq("/api/v1/message"), anyList(), messageBodyCaptor.capture());
verify(webSocketClient, never()).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), anyList(),
eq(Optional.empty()));
final List<MessageProtos.Envelope> sentMessages = messageBodyCaptor.getAllValues().stream()
.map(Optional::get)
.map(messageBytes -> {
try {
return Envelope.parseFrom(messageBytes);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
try {
return Envelope.parseFrom(messageBytes);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
assertTrue(expectedMessages.containsAll(sentMessages));
});
}
private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid) {

View File

@@ -210,7 +210,7 @@ public class WebSocketConnectionTest {
futures.get(2).completeExceptionally(new IOException());
verify(storedMessages, times(1)).delete(eq(accountUuid), eq(2L), eq(outgoingMessages.get(1).getGuid()));
verify(receiptSender, times(1)).sendReceipt(eq(auth), eq("sender1"), eq(2222L));
verify(receiptSender, times(1)).sendReceipt(eq(auth), eq(senderOneUuid), eq(2222L));
connection.stop();
verify(client).close(anyInt(), anyString());
@@ -279,6 +279,8 @@ public class WebSocketConnectionTest {
public void testPendingSend() throws Exception {
MessagesManager storedMessages = mock(MessagesManager.class);
final UUID senderTwoUuid = UUID.randomUUID();
final Envelope firstMessage = Envelope.newBuilder()
.setLegacyMessage(ByteString.copyFrom("first".getBytes()))
.setSource("sender1")
@@ -291,7 +293,7 @@ public class WebSocketConnectionTest {
final Envelope secondMessage = Envelope.newBuilder()
.setLegacyMessage(ByteString.copyFrom("second".getBytes()))
.setSource("sender2")
.setSourceUuid(UUID.randomUUID().toString())
.setSourceUuid(senderTwoUuid.toString())
.setTimestamp(System.currentTimeMillis())
.setSourceDevice(2)
.setType(Envelope.Type.CIPHERTEXT)
@@ -361,7 +363,7 @@ public class WebSocketConnectionTest {
futures.get(1).complete(response);
futures.get(0).completeExceptionally(new IOException());
verify(receiptSender, times(1)).sendReceipt(eq(auth), eq("sender2"), eq(secondMessage.getTimestamp()));
verify(receiptSender, times(1)).sendReceipt(eq(auth), eq(senderTwoUuid), eq(secondMessage.getTimestamp()));
connection.stop();
verify(client).close(anyInt(), anyString());

View File

@@ -65,18 +65,19 @@ public class WebSocketResourceProviderFactory<T extends Principal> extends WebSo
}
}
return new WebSocketResourceProvider<T>(getRemoteAddress(request),
this.jerseyApplicationHandler,
this.environment.getRequestLog(),
authenticated,
this.environment.getMessageFactory(),
ofNullable(this.environment.getConnectListener()),
this.environment.getIdleTimeoutMillis());
return new WebSocketResourceProvider<>(getRemoteAddress(request),
this.jerseyApplicationHandler,
this.environment.getRequestLog(),
authenticated,
this.environment.getMessageFactory(),
ofNullable(this.environment.getConnectListener()),
this.environment.getIdleTimeoutMillis());
} catch (AuthenticationException | IOException e) {
logger.warn("Authentication failure", e);
try {
response.sendError(500, "Failure");
} catch (IOException ex) {}
} catch (IOException ignored) {
}
return null;
}
}