这里是以商家的服务来当成一件商品来处理的,所以service可以理解成商品。这里有一个区位的概念,即秒杀当天商家可以报名的…
这里是以商家的服务来当成一件商品来处理的,所以service可以理解成商品。这里有一个区位的概念,即秒杀当天商家可以报名的所有时间段,我们称之为区位,比如0点~2点,2点~4点等等。区位的实体类字段如下。
*/
1 2 3 @Data @AllArgsConstructor @NoArgsConstructor
1 2 public class Location { private Long id;
1 2 3 4 5 6 private LocalDate date; private TimeSegment timeSegment; private int locationNumber; private BigDecimal baseCost; private double sellPercent; private boolean releaseLocation;
}
*/
1 @RequiredArgsConstructor
1 2 3 4 public class Service { @Getter @Setter @NonNull
1 private String serviceCode;
1 private String serviceName;
1 private String serviceLevel;
1 private CheckStatus checkStatus;
1 private Lock lock = new ReentrantLock ();
1 private Condition condition = lock.newCondition();
}
1 2 3 商家商品(服务)数据导入到redis的商品队列中 /** * 在每天的0点开始处理,查看后一天是否有秒杀活动
*/
1 2 3 4 5 @Slf4j @Component @ElasticSimpleJob(cron="0 0 0 * * ?",jobName="loadRedis",shardingTotalCount=2,jobParameter="日期",shardingItemParameters="0=Load0,1=Load1") public class LoadDataToRedisJob implements SimpleJob { @Autowired
1 private RedisService redisService;
1 private DataDao dataDao;
1 2 3 4 5 6 7 8 public void execute (ShardingContext shardingContext) { List<Location> locationNextDayList = dataDao.findJobToday(LocalDate.now().plusDays(1 )); if (!CollectionUtils.isEmpty(locationNextDayList)) { List<TimeSegment> segmentList = locationNextDayList.stream().map(location -> location.getTimeSegment()) .collect(Collectors.toList()); switch (shardingContext.getShardingItem()) {
case 0:
1 2 3 4 5 6 7 8 9 10 11 12 13 //获取每个时间段内的所有参与秒杀的服务 segmentList.stream().map(timeSegment -> { List<Service> serviceInSegment = dataDao.findServiceInSegment(timeSegment); serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment)); return serviceInSegment; //扁平化所有服务,统一为一组List }).flatMap(services -> services.stream()).forEach(service -> { //以服务id以及秒杀时间段组合为主键 String key = service.getId() + service.getTimeSegment().toString(); //如果redis中存在该主键的队列,则清空队列 if (redisService.exists(key)) { for (int i = 0; i < redisService.llen(key); i++) { redisService.rpop(key);
}
1 2 3 //清空后,根据每个服务的参与总数,将服务按总数量推送到该主键队列中 for (int i = 0; i < service.getTotalCount(); i++) { redisService.lpush(key, JSONObject.toJSONString(service));
}
1 2 3 4 5 log.info(service.getId() + service.getTimeSegment().toString()); //以服务id+":count"组合成该服务的总数键,如果redis中存在该键,则删除 String countKey = service.getId() + ":count"; if (redisService.exists(countKey)) { redisService.del(countKey);
}
1 2 3 4 //重新将总数放入该键的redis中存储 redisService.set(countKey, String.valueOf(service.getTotalCount())); }); break;
case 1:
}
}
将服务导入到redis队列后,我们需要设立一个秒杀活动开始的标识,让秒杀下单只能在秒杀活动进行中开启,不在秒杀时间内不允许下单。
1 2 /** * 给秒杀时间点设立开启标识,每天0点开始,每2小时执行一次
*/
1 2 @ElasticSimpleJob(cron="0 0 0/2 * * ?",jobName="openTimeSeg",shardingTotalCount=1,jobParameter="日期",shardingItemParameters="0=Open0") public class OpenTimeSegmentJob implements SimpleJob {
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 //获取当天的所有秒杀区位 List<Location> locationToDayList = dataDao.findJobToday(LocalDate.now()); //如果当天有秒杀活动 if (!CollectionUtils.isEmpty(locationToDayList)) { //获取当前时间点,当前时间点不一定是准点 LocalDateTime now = LocalDateTime.now(); int year = now.getYear(); int month = now.getMonthValue(); int day = now.getDayOfMonth(); int hour = now.getHour(); //将当前时间拼装成整点 LocalDateTime beginDate = LocalDateTime.of(year, month, day, hour, 0, 0); //以整点时间为基准,在redis中放入开启秒杀时间段,119分钟后消失(每个时间段段为1小时59分钟,2小时的最后一分钟结束该时间段秒杀) redisService.set("TimeStart:" + new TimeSegment(beginDate, beginDate.plusMinutes(119)).toString(), "opened",7140); log.info(beginDate.toString() + "至" + beginDate.plusMinutes(119).toString() + "秒杀开始");
}
到了秒杀时间,我们就可以开始下单了,先定义一个秒杀单的接口
1 2 3 4 5 public interface SecOrder {
*/
1 public String makeOrder (SecOrder secOrder) ;
1 2 3 /** * 是否存在该订单编号的秒杀单 * @param orderNo
*/
1 public boolean exitsOrder (String orderNo) ;
*/
1 public void changePayStatus (String orderNo) ;
} SecOrder的实现类的各属性如下。
*/
1 2 @ServiceSecOrderVersion(value = 1) public class ServiceSecOrder implements SecOrder {
1 private Service service;
1 private TimeSegment timeSegment;
1 private BigDecimal amount;
1 private int orderStatus;
1 private LocalDateTime createDate;
然后开始下秒杀订单
1 2 3 4 5 6 7 8 public String makeOrder (SecOrder secOrder) { RedisService redisService = SpringBootUtil.getBean(RedisService.class); IdService idService = SpringBootUtil.getBean(IdService.class); MessageSender messageSender = SpringBootUtil.getBean(MessageSender.class); if (LocalDateTime.now().isBefore(((ServiceSecOrder)secOrder).getTimeSegment().getBeginTime()) || LocalDateTime.now().isAfter(((ServiceSecOrder)secOrder).getTimeSegment().getEndTime())) { throw new RuntimeException ("不在秒杀时间段内" );
}
1 2 3 4 5 //由于测试时间的问题,此处需要屏蔽,等到实际部署时需要恢复 LocalDateTime beginDate = LocalDateTime.of(year,month,day,hour,0,0); //从redis中检查是否有开启秒杀时间段 if (!redisService.exists("TimeStart:" + new TimeSegment(beginDate,beginDate.plusMinutes(119)).toString())) { throw new RuntimeException("当前时间段无秒杀");
}
1 2 3 ((ServiceSecOrder)secOrder).setId(idService.genId()); if (((ServiceSecOrder)secOrder).getNumber() > ((ServiceSecOrder)secOrder).getService().getLimitCount()) { throw new RuntimeException("秒杀数量超出限购");
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); ((ServiceSecOrder)secOrder).setOrderNo(getCode(idService)); user.setUsername(loginAppUser.getUsername()); ((ServiceSecOrder)secOrder).setUser(user); //设置订单状态0表示未支付状态 ((ServiceSecOrder)secOrder).setOrderStatus(0); ((ServiceSecOrder)secOrder).setCreateDate(LocalDateTime.now()); //设置用户秒杀队列键名(每一种服务都有独立的用户秒杀队列) //队列名由User:+服务id+时间段组成 String key = "User:" + ((ServiceSecOrder)secOrder).getService().getId() + ((ServiceSecOrder)secOrder).getTimeSegment().toString(); String serviceKey = ((ServiceSecOrder)secOrder).getService().getId() + ((ServiceSecOrder)secOrder).getService().getTimeSegment().toString(); //如果服务队列还有数据,则推送用户进队列,否则直接返回秒杀失败 if (redisService.llen(serviceKey) > 0) { //建立每个服务对每个用户对限制数量对键,格式为——"服务id:用户id" String limitUserKey = ((ServiceSecOrder) secOrder).getService().getId() + ":" + ((ServiceSecOrder) secOrder).getUser().getId(); //如果该键存在,获取该键的值(这里需要考虑分布式的并发问题的可能) //但是有幂等,所以此处不会出现一个用户同时秒杀一个商品(服务)的多个并发线程存在 if (redisService.exists(limitUserKey)) { String limitCount = redisService.get(limitUserKey); //如果该键的值达到服务商品的限制数,返回秒杀失败 if (Integer.valueOf(limitCount) == ((ServiceSecOrder) secOrder).getService().getLimitCount()) { return ((ServiceSecOrder) secOrder).getUser().getUsername() + "秒杀服务" + ((ServiceSecOrder) secOrder).getService().getServiceName() + "失败";
}
1 }else { //如果不存在,设置该键的值为0
1 redisService.set(limitUserKey,"0");
}
1 2 //将秒杀用户id推送到该队列中 redisService.lpush(key, ((ServiceSecOrder) secOrder).getUser().getId() + "");
我们将用户id推送到redis队列后就要开始匹配秒杀结果了,因为商品队列早已经在前一天就推送进去了。
1 2 /** * 秒杀结果匹配任务,每天0点开始,每2小时执行一次
*/
1 @RabbitListener(queues = {SecendKillMq.SECENDKILL_QUEUE + "_1",
SecendKillMq.SECENDKILL_QUEUE + "_2",
SecendKillMq.SECENDKILL_QUEUE + "_3",
SecendKillMq.SECENDKILL_QUEUE + "_4",
SecendKillMq.SECENDKILL_QUEUE + "_5",
SecendKillMq.SECENDKILL_QUEUE + "_6",
SecendKillMq.SECENDKILL_QUEUE + "_7",
SecendKillMq.SECENDKILL_QUEUE + "_8",
SecendKillMq.SECENDKILL_QUEUE + "_9",
1 2 3 SecendKillMq.SECENDKILL_QUEUE + "_10" }) @ElasticSimpleJob(cron="0 0 0/2 * * ?",jobName="secResult",shardingTotalCount=2,jobParameter="日期",shardingItemParameters="0=SecKill0,1=SecKill1") public class SecendKillResultJob implements SimpleJob {
1 2 private List<Service> serviceList;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 //获取当天所有的秒杀区位 List<Location> locationTodayList = dataDao.findJobToday(LocalDate.now()); if (!CollectionUtils.isEmpty(locationTodayList)) { List<TimeSegment> segmentList = locationTodayList.stream().map(location -> location.getTimeSegment()) //从所有秒杀区位时间段过滤当前秒杀时间段 this.serviceList = segmentList.stream().filter(timeSegment -> LocalDateTime.now().isAfter(timeSegment.getBeginTime()) && LocalDateTime.now().isBefore(timeSegment.getEndTime())) //将时间段转化成时间段内的秒杀服务 .map(timeSegment -> { //扁平化所有的秒杀服务,将所有当前时间段内的服务放入serviceList属性中 //就是拿出当前时间段内所有参与秒杀的服务 }).flatMap(services -> services.stream()).collect(Collectors.toList()); //并行化处理所有的秒杀服务 int lism = 0; if (serviceList.size() > Runtime.getRuntime().availableProcessors() * 2) { lism = serviceList.size(); }else { lism = Runtime.getRuntime().availableProcessors() * 2;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ForkJoinPool forkJoinPool = new ForkJoinPool(lism); try { forkJoinPool.submit(() -> this.serviceList.parallelStream().forEach(service -> { while (true) { try { service.getLock().lock(); String userKey = "User:" + service.getId() + service.getTimeSegment().toString(); String serviceKey = service.getId() + service.getTimeSegment().toString(); //如果下秒杀时间内没有用户下单该服务,则中断该服务的并行线程 //如果有用户下单则唤醒该并行线程 while (redisService.llen(userKey).equals(0L)) { try { log.info("用户队列无数据,开始中断"); service.getCondition().await(); if (now.isAfter(service.getTimeSegment().getEndTime())) {
}
1 2 } catch (InterruptedException e) { e.printStackTrace();
}
1 2 到这里,如果没有用户下单,则会进行线程中断,不会去执行while (true)的无限循环。SecendKillResultJob同时又是RabbitMQ的一个消费者,同时监听了10个消息队列,监听后进行如下处理 @RabbitHandler
1 2 3 4 5 public void receice (byte [] data, Channel channel, Message message) throws IOException { try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false );
1 2 3 4 5 6 7 8 9 10 11 12 13 Long serviceId = unSerialize(data); log.info(serviceId + ""); //如果当前秒杀服务列表不为空 if (!CollectionUtils.isEmpty(this.serviceList)) { //从服务列表中过滤出id为MQ收取的服务ID的服务 this.serviceList.stream().filter(service -> service.getId().equals(serviceId)) .forEach(service -> { log.info("存在" + service.getId()); try { //对该服务所在线程进行唤醒 service.getCondition().signalAll(); } finally { service.getLock().unlock();
}
1 2 3 4 5 }); } catch (IOException e) { //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail");
}
1 2 3 /** * 反序列化 * @param data
*/
1 2 3 4 5 6 private Long unSerialize (byte [] data) { Input input = null ; try { Kryo kryo = new Kryo (); input = new Input (new ByteArrayInputStream (data)); return kryo.readObject(input,Long.class);
}
1 2 finally { input.close();
}
这样我们再回到ServiceSecOrder的makeOrder下单方法中,将用户下单的服务id异步发送到MQ中,去唤醒秒杀结果匹配任务继续执行。
*/
}
}
}
1 //如果该键存在,获取该键的值(这里需要考虑分布式的并发题的可能)
}
}
1 2 3 //唤醒秒杀结果匹配任务继续执行。 CompletableFuture.runAsync(() -> { messageSender.send(SecendKillMq.MQ_EXCHANGE_SECENDKILL,
SecendKillMq.ROUTING_KEY_SECENDKILL,
1 2 ((ServiceSecOrder) secOrder).getService().getId()); });
1 2 3 4 5 6 7 8 public Object secondKill (String userKey, String serviceKey,String userResult, String countKey,String serviceId) { String script = "if redis.call('llen',KEYS[1]) > 0 and redis.call('llen',KEYS[2]) > 0 " + "and tonumber(redis.call('get',KEYS[4])) > 0 then " + "local userid = redis.call('rpop',KEYS[1])" + "redis.call('hset',KEYS[3],userid,redis.call('rpop',KEYS[2])) " + "redis.call('decr',KEYS[4]) " + "return redis.call('incr',KEYS[5]..':'..userid) else return 0 end" ; return execute(jedis -> jedis.eval(script,5 ,userKey,serviceKey,userResult,countKey,serviceId));
} 秒杀服务线程唤醒后,继续执行
*/
}
}
}
1 2 3 4 5 log.info("中断被唤醒,继续运行"); //如果用户队列和服务队列均有数据 while (redisService.llen(userKey) > 0 && redisService.llen(serviceKey) > 0) { //匹配出秒杀结果,并扣减服务数量,增加用户秒杀过该服务的数量 redisService.secondKill(userKey, serviceKey, "UserResult" + service.getId(), countKey,String.valueOf(service.getId()));
}
1 2 3 4 //如果服务队列为空,表示被秒杀完了,从用户队列弹出用户,告知秒杀失败 while (redisService.llen(serviceKey).equals(0L)) { redisService.hset("UserResult" + service.getId(),redisService.rpop(userKey),"秒杀失败"); if (redisService.llen(userKey).equals(0L)) {
}
1 //当前时间已经超出了秒杀时间段,结束while(true)无限循环
}
}
1 2 })).get(); } catch (ExecutionException e) {
}
}
当有用户秒杀到服务时,或者服务被秒杀完,用户的下单需要知道自己是否秒杀成功或者秒杀失败。
*/
}
}
}
}
}
1 2 3 4 5 6 7 8 }); LocalDateTime start = LocalDateTime.now(); //从redis的匹配结果获取当前用户的秒杀结果 Future<String> future = CompletableFuture.supplyAsync(() -> { if (redisService.hexists("UserResult" + ((ServiceSecOrder) secOrder).getService().getId(), ((ServiceSecOrder) secOrder).getUser().getId() + "")) { return redisService.hget("UserResult" + ((ServiceSecOrder) secOrder).getService().getId(), ((ServiceSecOrder) secOrder).getUser().getId() + "");
}
1 2 if (LocalDateTime.now().isAfter(start.plusSeconds(3))) { return "秒杀失败";
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 }); try { if (future.get(3000 , TimeUnit.MILLISECONDS).equals("秒杀失败" )) { log.info(((ServiceSecOrder) secOrder).getUser().getUsername() + "秒杀服务" + ((ServiceSecOrder) secOrder).getService().getServiceName() + "失败" ); } else { + ((ServiceSecOrder) secOrder).getService().getServiceName() + "成功" ); SecOrderDao secOrderDao = SpringBootUtil.getBean(SecOrderDao.class); secOrderDao.saveServiceSecOrder((ServiceSecOrder) secOrder); ServicePayBackSender servicePayBackSender = SpringBootUtil.getBean(ServicePayBackSender.class); servicePayBackSender.send(SecendKillMq.MQ_EXCHANGE_DEAD,SecendKillMq.ROURING_KEY_DEAD,(ServiceSecOrder) secOrder); }); + ((ServiceSecOrder) secOrder).getService().getServiceName() + "成功" ;
}
}
}
这里使用了RabbitMQ的延迟队列,配置如下
1 public class RabbitmqConfig {
1 2 3 4 5 public List<Queue> secendKillQueues () { List<Queue> queues = new ArrayList <>(); for (int i = 1 ;i < 11 ;i++) { Queue queue = new Queue (SecendKillMq.SECENDKILL_QUEUE + "_" + i); queues.add(queue);
}
}
1 2 public TopicExchange secendKillExchange () { return new TopicExchange (SecendKillMq.MQ_EXCHANGE_SECENDKILL);
}
1 2 3 4 5 public List<Binding> bingingSecondKill () { List<Binding> bindings = new ArrayList <>(); Binding binding = BindingBuilder.bind(secendKillQueues().get(i - 1 )).to(secendKillExchange()) .with(SecendKillMq.ROUTING_KEY_SECENDKILL + "_" + i); bindings.add(binding);
}
}
1 2 3 4 5 public Queue deadQueue () { Map<String,Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" ,SecendKillMq.MQ_EXCHANGE_DEAD); arguments.put("x-dead-letter-routing-key" ,SecendKillMq.ROUTING_KEY_PAYBACK); return new Queue (SecendKillMq.DEAD_QUEUE,true ,false ,false ,arguments);
}
1 2 public DirectExchange deadExchange () { return new DirectExchange (SecendKillMq.MQ_EXCHANGE_DEAD);
}
1 2 3 public Binding bindingDeadExchange () { return BindingBuilder.bind(deadQueue()).to(deadExchange()) .with(SecendKillMq.ROURING_KEY_DEAD);
}
1 2 public Queue payBackQueue () { return new Queue (SecendKillMq.PAYBACK_QUEUE,true ,false ,false );
}
1 2 3 public Binding bindingPayBack () { return BindingBuilder.bind(payBackQueue()).to(deadExchange()) .with(SecendKillMq.ROUTING_KEY_PAYBACK);
}
} 消息生产者ServicePayBackSender如下
1 public class ServicePayBackSender implements RabbitTemplate .ConfirmCallback,RabbitTemplate.ReturnCallback {
1 private RabbitTemplate rabbitTemplate;
1 2 3 4 5 6 7 8 public void send (String exchange,String routingKey,Object content) { log.info("send content=" + content); this .rabbitTemplate.setMandatory(true ); this .rabbitTemplate.setConfirmCallback(this ); this .rabbitTemplate.setReturnCallback(this ); MessagePostProcessor processor = message -> { message.getMessageProperties().setExpiration(300000 + "" ); return message;
};
1 this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content),processor);
}
1 2 3 4 5 /** * 确认后回调: * @param correlationData * @param ack * @param cause
*/
1 2 3 4 public void confirm (CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.info("send ack fail, cause = " + cause); log.info("send ack success" );
}
*
1 2 3 4 5 * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey
*/
1 2 public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("send fail return-message = " + new String (message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
}
1 2 3 /** * 对消息对象进行二进制序列化 * @param o
*/
1 2 3 4 5 6 private byte [] serialize(Object o) { ByteArrayOutputStream stream = new ByteArrayOutputStream (); Output output = new Output (stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray();
}
消费者如下
1 2 @RabbitListener(queues = SecendKillMq.PAYBACK_QUEUE) public class ServicePayBackDeal {
1 private SecOrderDao secOrderDao;
1 2 3 4 5 6 7 8 try { //告诉服务器收到这条消 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 ServiceSecOrder order = unSerialize(data); if (secOrderDao.countServiceSecOrderHasPay(order) == 0) { String key = order.getService().getId() + order.getService().getTimeSegment().toString(); String countKey = order.getService().getId() + ":count"; String countLimit = order.getService().getId() + ":" + order.getUser().getId(); redisService.unPayedBack(key, JSONObject.toJSONString(order.getService()),countKey,countLimit);
}
}
*/
1 2 3 private ServiceSecOrder unSerialize (byte [] data) { try { return kryo.readObject(input,ServiceSecOrder.class);
}
}
unPayedBack也是一段redis-lua,代码如下
1 2 3 4 5 public Object unPayedBack (String serviceKey,String serviceValue, String countKey,String limitCountKey) { String script = "redis.call('lpush',KEYS[1],ARGV[1]) " + "redis.call('incr',KEYS[2]) " + "return redis.call('decr',KEYS[3])" ; return execute(jedis -> jedis.eval(script,3 ,serviceKey,countKey,limitCountKey,serviceValue));
}
1 2 3 在秒杀结束后唤醒所有的秒杀中断,退出while(true)的无限循环 /** * 在每个秒杀段最后一分钟唤醒所有秒杀中断,每天的1点59分开始,每2小时执行一次
*/
1 2 @ElasticSimpleJob(cron="0 59 1/2 * * ?",jobName="signal",shardingTotalCount=2,jobParameter="日期",shardingItemParameters="0=Signal0,1=Signal1") public class SignalJob implements SimpleJob {
1 private MessageSender messageSender;
1 2 3 4 5 segmentList.stream().filter(timeSegment -> LocalDateTime.now().plusMinutes(-2L).isAfter(timeSegment.getBeginTime()) && LocalDateTime.now().plusMinutes(-2L).isBefore(timeSegment.getEndTime())) //扁平化所有的秒杀服务 service.getId()); });
}
现在我们来建立下单的Controller 先建立一个秒杀单工厂接口
1 2 3 public interface SecOrderFactory {
*/
1 public SecOrder createSecOrder () ;
*/
1 public SecOrder getSecOrder () ;
} Service版本标签
1 2 3 4 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface ServiceSecOrderVersion { int value () ;
} Service工厂实现类
1 2 3 public class ServiceSecOrderFactory implements SecOrderFactory { private Set<Class<?>> classes = ClassUtil.getClassSet("com.cloud.secondkill.domain" ); private SecOrder createdSecOrder;
1 2 private void init () { this .createdSecOrder = createSecOrder();
}
1 2 3 4 5 6 7 8 9 10 11 12 public SecOrder createSecOrder () { Object instance = null ; try { instance = classes.stream().filter(clazz -> clazz.isAnnotationPresent(ServiceSecOrderVersion.class)) .filter(clazz -> SecOrder.class.isAssignableFrom(clazz)) .max(Comparator.comparingInt(clazz -> clazz.getAnnotation(ServiceSecOrderVersion.class).value())) .get().newInstance(); } catch (InstantiationException e) { } catch (IllegalAccessException e) {
}
1 return (SecOrder) instance;
}
1 2 public SecOrder getSecOrder () { return createdSecOrder;
}
秒杀单的Bean
1 public class SecOrderBean {
1 private Map<String,Class> secOrderFactoryMap = new HashMap <>();
1 private Map<String,Class> secOrderMap = new HashMap <>();
1 2 3 4 5 Set<Class<?>> classes = ClassUtil.getClassSet("com.cloud.secondkill.domain" ); classes.stream().filter(clazz -> SecOrderFactory.class.isAssignableFrom(clazz)) .forEach(clazz -> secOrderFactoryMap.put(clazz.getSimpleName(),clazz)); classes.stream().filter(clazz -> SecOrder.class.isAssignableFrom(clazz)) .forEach(clazz -> secOrderMap.put(clazz.getSimpleName(),clazz));
}
下单Controller
1 2 3 public class SecOrderController { private ThreadLocal<SecOrderFactory> secOrderFactory = new ThreadLocal <>(); private ThreadLocal<SecOrder> secOrderService = new ThreadLocal <>();
1 private SecOrderBean secOrderBean;
1 2 3 4 5 6 7 8 9 10 11 12 @SuppressWarnings("unchecked") @Transactional @PostMapping("/makesecorder") @LxRateLimit(perSecond = 500,timeOut = 500) public Result<String> makeSecOrder (@RequestBody String secOrderStr, @RequestParam("type") String type) throws Exception { log.info(secOrderStr); try { SecOrder secOrder = setSecOrderFactory(secOrderStr, type); String secResult = this .secOrderService.get().makeOrder(secOrder); return Result.success(secResult); secOrderFactory.remove(); secOrderService.remove();
}
1 2 3 4 5 private SecOrder setSecOrderFactory (String secOrderStr,String type) { Class classType = secOrderBean.getSecOrderMap().get(type); Object secOrder = JSONObject.parseObject(secOrderStr, classType); setSecOrderFactory(type); return (SecOrder) secOrder;
}
1 2 3 4 private void setSecOrderFactory (String type) { Class classFactoryType = secOrderBean.getSecOrderFactoryMap().get(type + "Factory" ); this .secOrderFactory.set((SecOrderFactory) SpringBootUtil.getBean(classFactoryType)); this .secOrderService.set(this .secOrderFactory.get().getSecOrder());
}
1 2 3 4 @LxRateLimit 为接口限流 @Target(ElementType.METHOD) public @interface LxRateLimit {
*/
1 String value() default "" ;
1 2 /** * 每秒向桶中放入令牌的数量 默认最大即不做限流
*/
1 double perSecond() default Double.MAX_VALUE;
*/
1 int timeOut() default 0 ;
*/
1 TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS;
} AOP拦截
1 2 public class LxRateLimitAspect { private RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE);
*/
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @ResponseBody @Around(value = "@annotation(com.cloud.secondkill.annotion.LxRateLimit)") public Object aroundNotice (ProceedingJoinPoint pjp) throws Throwable { log.info("拦截到了{}方法。.." , pjp.getSignature().getName()); Signature signature = pjp.getSignature(); MethodSignature methodSignature = (MethodSignature)signature; Method targetMethod = methodSignature.getMethod(); if (targetMethod.isAnnotationPresent(LxRateLimit.class)) { LxRateLimit lxRateLimit = targetMethod.getAnnotation(LxRateLimit.class); rateLimiter.setRate(lxRateLimit.perSecond()); if (!rateLimiter.tryAcquire(lxRateLimit.timeOut(), lxRateLimit.timeOutUnit())) return "服务器繁忙,请稍后再试!" ;
}
}
现在要加入秒杀的幂等,来防止工具秒杀 token Controller
1 public class TokenController {
1 2 3 4 5 6 7 8 9 @GetMapping("/gettoken") public Map getToken (@RequestParam("url") String url) { Map<String,String> tokenMap = new HashMap <>(); String tokenValue = UUID.randomUUID().toString(); AppUser user = AppUserUtil.getLoginAppUser(); String key = url + user.getId(); tokenMap.put(key,tokenValue); redisService.set(key,tokenValue); return tokenMap;
}
配置幂等Spring MVC拦截器
1 public class TokenInterceptor implements HandlerInterceptor {
1 2 3 4 5 6 public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String tokenName = request.getRequestURI() + user.getId(); String tokenValue = request.getParameter("token_value" ); if (tokenValue != null && !tokenValue.equals("" )) { log.info("tokenName:{},tokenValue:{}" ,tokenName,tokenValue); return handleToken(request,response,handler,user);
}
}
1 2 3 public void postHandle (HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable ModelAndView modelAndView) throws Exception { if (redisService.exists(request.getParameter("token_value" ))) { RedisTool.releaseDistributedLock(redisService, request.getParameter("token_value" ), request.getParameter("token_value" ));
}
1 public void afterCompletion (HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable Exception ex) throws Exception {
}
1 2 3 4 5 6 /** * 分布式锁处理 * @param request * @param response * @param handler * @throws Exception
*/
1 2 3 4 5 6 7 private boolean handleToken (HttpServletRequest request, HttpServletResponse response, Object handler,AppUser user) throws Exception { if (RedisTool.tryGetDistributedLock(redisService,request.getParameter("token_value" ),request.getParameter("token_value" ),180 )) { if (redisService.exists(request.getRequestURI() + user.getId())) { if (redisService.get(request.getRequestURI() + user.getId()).equals(request.getParameter("token_value" ))) {
1 2 3 4 5 redisService.del(request.getRequestURI() + user.getId()); log.info("放行"); RedisTool.releaseDistributedLock(redisService,request.getParameter("token_value"),request.getParameter("token_value")); //放行 return true;
}
1 2 3 log.info("拦截"); //当请求的url与token与redis中的存储不相同时,解除锁定 //进行拦截
}
}
1 @SpringBootConfiguration
1 public class TokenInterceptorConfig extends WebMvcConfigurerAdapter {
1 private TokenInterceptor tokenInterceptor;
1 2 public void addInterceptors (InterceptorRegistry registry) { registry.addInterceptor(tokenInterceptor).addPathPatterns("/makesecorder" );
}
本文标题: 分布式秒杀
发布时间: 2022年01月12日 00:00
最后更新: 2025年12月30日 08:54
原始链接: https://haoxiang.eu.org/b5955db7/
版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0 许可协议,转载请注明出处!