Commit 8d2272ed by 程裕兵

feat:delay task code add 同步半屏小程序添加状态

parent a158089f
...@@ -40,6 +40,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; ...@@ -40,6 +40,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
"com.jiejing.fitness.rpc.api", "com.jiejing.fitness.rpc.api",
"com.jiejing.permcenter.api", "com.jiejing.permcenter.api",
"com.jiejing.configcenter.api", "com.jiejing.configcenter.api",
"com.jiejing.workflow.api",
}) })
@SpringBootApplication(scanBasePackages = "com.jiejing", exclude = { @SpringBootApplication(scanBasePackages = "com.jiejing", exclude = {
DruidDataSourceAutoConfigure.class DruidDataSourceAutoConfigure.class
......
...@@ -96,6 +96,11 @@ ...@@ -96,6 +96,11 @@
<artifactId>configcenter-api</artifactId> <artifactId>configcenter-api</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>com.jiejing.base</groupId>
<artifactId>workflow-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- ======================= jiejing api end ======================= --> <!-- ======================= jiejing api end ======================= -->
......
...@@ -44,6 +44,10 @@ ...@@ -44,6 +44,10 @@
<artifactId>configcenter-api</artifactId> <artifactId>configcenter-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.jiejing.base</groupId>
<artifactId>workflow-api</artifactId>
</dependency>
<dependency>
<groupId>com.jiejing.event</groupId> <groupId>com.jiejing.event</groupId>
<artifactId>scs-event</artifactId> <artifactId>scs-event</artifactId>
</dependency> </dependency>
......
...@@ -11,6 +11,7 @@ import com.jiejing.paycenter.common.event.MerchantEvent; ...@@ -11,6 +11,7 @@ import com.jiejing.paycenter.common.event.MerchantEvent;
import com.jiejing.paycenter.common.event.MerchantSubChannelEvent; import com.jiejing.paycenter.common.event.MerchantSubChannelEvent;
import com.jiejing.paycenter.common.event.PayEvent; import com.jiejing.paycenter.common.event.PayEvent;
import com.jiejing.paycenter.common.event.RefundEvent; import com.jiejing.paycenter.common.event.RefundEvent;
import com.jiejing.workflow.event.DelayTaskEvent;
import com.xiaomai.event.annotation.EventHandler; import com.xiaomai.event.annotation.EventHandler;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Optional; import java.util.Optional;
...@@ -90,4 +91,14 @@ public class ListenerService { ...@@ -90,4 +91,14 @@ public class ListenerService {
} }
} }
@EventHandler(value = DelayTaskEvent.class, binder = "biz-kafka", maxAttempts = MAX_RETRY)
public void delayTaskEventCallback(DelayTaskEvent event, @Header(DELIVERY_ATTEMPT) int retryNum) {
try {
log.info("start process delayTaskEventCallback {}", JSON.toJSONString(event));
studioMerchantService.syncEmbeddedXcx(event.getId());
} catch (Exception e) {
log.info("process delayTaskEventCallback fail {}", event, e);
}
}
} }
...@@ -191,4 +191,9 @@ public interface StudioMerchantService { ...@@ -191,4 +191,9 @@ public interface StudioMerchantService {
*/ */
void syncEmbeddedXcx(); void syncEmbeddedXcx();
/**
* 同步半屏小程序状态
*/
void syncEmbeddedXcx(Long id);
} }
package com.jiejing.fitness.finance.service.merchant.convert; package com.jiejing.fitness.finance.service.merchant.convert;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.jiejing.common.config.idgen.adapter.IdWorker; import com.jiejing.common.config.idgen.adapter.IdWorker;
import com.jiejing.common.utils.time.TimeUtil;
import com.jiejing.fitness.enums.delay.DelayTaskCodeEnum;
import com.jiejing.fitness.enums.finance.EmbededXcxEnum; import com.jiejing.fitness.enums.finance.EmbededXcxEnum;
import com.jiejing.fitness.finance.api.merchant.vo.StudioMerchantApplyVO; import com.jiejing.fitness.finance.api.merchant.vo.StudioMerchantApplyVO;
import com.jiejing.fitness.finance.repository.entity.StudioEmbeddedXcxApply; import com.jiejing.fitness.finance.repository.entity.StudioEmbeddedXcxApply;
import com.jiejing.wechat.vo.xcx.HalfScreenXcxAuthVO.AuthXcxInfo; import com.jiejing.wechat.vo.xcx.HalfScreenXcxAuthVO.AuthXcxInfo;
import com.jiejing.workflow.api.request.BatchSaveDelayTaskRequest;
import com.jiejing.workflow.api.request.BatchSaveDelayTaskRequest.TaskInfo;
import com.jiejing.workflow.common.enums.DelayTopicEnum;
import java.time.temporal.ChronoUnit;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -91,4 +99,22 @@ public class StudioEmbeddedXcxConvert { ...@@ -91,4 +99,22 @@ public class StudioEmbeddedXcxConvert {
toInit.setUpdateTime(new Date()); toInit.setUpdateTime(new Date());
return toInit; return toInit;
} }
public static BatchSaveDelayTaskRequest toDelayTask(StudioEmbeddedXcxApply apply, String appId) {
// 同步两次即可,分别是申请后30秒,申请后5分钟。两次都没有最新状态,则认为本次申请发生异常可以人工介入
JSONObject body = new JSONObject().fluentPut("appId", appId);
return BatchSaveDelayTaskRequest.builder()
.topic(DelayTopicEnum.COMMON_DELAY_TASK)
.tenantId(apply.getStudioId())
.bizCode(DelayTaskCodeEnum.EMBEDDED_XCX.getCode())
.batch(Lists.newArrayList(
TaskInfo.builder().bizId(apply.getId())
.executionTime(TimeUtil.local().plus(apply.getCreateTime(), 30, ChronoUnit.SECONDS))
.taskBody(body).build(),
TaskInfo.builder().bizId(apply.getId())
.executionTime(TimeUtil.local().plus(apply.getCreateTime(), 5, ChronoUnit.MINUTES))
.taskBody(body).build()
))
.build();
}
} }
...@@ -36,6 +36,7 @@ import com.jiejing.fitness.finance.service.merchant.convert.MerchantConvert; ...@@ -36,6 +36,7 @@ import com.jiejing.fitness.finance.service.merchant.convert.MerchantConvert;
import com.jiejing.fitness.finance.service.merchant.convert.StudioEmbeddedXcxConvert; import com.jiejing.fitness.finance.service.merchant.convert.StudioEmbeddedXcxConvert;
import com.jiejing.fitness.finance.service.merchant.params.ApplyStudioMerchantParams; import com.jiejing.fitness.finance.service.merchant.params.ApplyStudioMerchantParams;
import com.jiejing.fitness.finance.service.merchant.params.PageStudioMerchantApplyParams; import com.jiejing.fitness.finance.service.merchant.params.PageStudioMerchantApplyParams;
import com.jiejing.fitness.finance.service.rpc.DelayTaskRpcService;
import com.jiejing.fitness.finance.service.rpc.MerchantRpcService; import com.jiejing.fitness.finance.service.rpc.MerchantRpcService;
import com.jiejing.fitness.finance.service.rpc.MessageRpcService; import com.jiejing.fitness.finance.service.rpc.MessageRpcService;
import com.jiejing.fitness.finance.service.rpc.ResourceRpcService; import com.jiejing.fitness.finance.service.rpc.ResourceRpcService;
...@@ -125,6 +126,9 @@ public class StudioMerchantServiceImpl implements StudioMerchantService { ...@@ -125,6 +126,9 @@ public class StudioMerchantServiceImpl implements StudioMerchantService {
@Resource @Resource
private WeXcxService weXcxService; private WeXcxService weXcxService;
@Resource
private DelayTaskRpcService delayTaskRpcService;
@Resource(name = "financeThreadPool") @Resource(name = "financeThreadPool")
private Executor executor; private Executor executor;
...@@ -301,7 +305,7 @@ public class StudioMerchantServiceImpl implements StudioMerchantService { ...@@ -301,7 +305,7 @@ public class StudioMerchantServiceImpl implements StudioMerchantService {
Integer code = result.getInteger("errcode"); Integer code = result.getInteger("errcode");
String message = result.getString("errmsg"); String message = result.getString("errmsg");
if (WX_SUCCESS_CODE.equals(code)) { if (WX_SUCCESS_CODE.equals(code)) {
this.doComplete(entity.getId(), appId); this.doComplete(entity, appId);
} else { } else {
studioEmbeddedXcxApplyRpService.updateById( studioEmbeddedXcxApplyRpService.updateById(
StudioEmbeddedXcxConvert.toRefuse(entity.getId(), message)); StudioEmbeddedXcxConvert.toRefuse(entity.getId(), message));
...@@ -316,7 +320,7 @@ public class StudioMerchantServiceImpl implements StudioMerchantService { ...@@ -316,7 +320,7 @@ public class StudioMerchantServiceImpl implements StudioMerchantService {
}); });
} }
private void doComplete(Long id, String appId) { private void doComplete(StudioEmbeddedXcxApply apply, String appId) {
HalfScreenXcxAuthVO vo = weXcxService.listBoundEmbeddedXcx(wxComponentAppId, appId, 0, 100); HalfScreenXcxAuthVO vo = weXcxService.listBoundEmbeddedXcx(wxComponentAppId, appId, 0, 100);
if (null == vo) { if (null == vo) {
return; return;
...@@ -332,15 +336,17 @@ public class StudioMerchantServiceImpl implements StudioMerchantService { ...@@ -332,15 +336,17 @@ public class StudioMerchantServiceImpl implements StudioMerchantService {
EmbededXcxEnum state = EmbededXcxEnum.getByWxCode(Integer.parseInt(info.getStatus())); EmbededXcxEnum state = EmbededXcxEnum.getByWxCode(Integer.parseInt(info.getStatus()));
switch (state) { switch (state) {
case SUCCESS: case SUCCESS:
studioEmbeddedXcxApplyRpService.updateById(StudioEmbeddedXcxConvert.toSuccess(id)); studioEmbeddedXcxApplyRpService.updateById(StudioEmbeddedXcxConvert.toSuccess(apply.getId()));
break; break;
case REFUSE: case REFUSE:
case TIMEOUT: case TIMEOUT:
case REVOKE: case REVOKE:
case CANCEL: case CANCEL:
studioEmbeddedXcxApplyRpService.updateById(StudioEmbeddedXcxConvert.toFail(id, state)); studioEmbeddedXcxApplyRpService.updateById(StudioEmbeddedXcxConvert.toFail(apply.getId(), state));
break; break;
default: default:
// 定时任务同步状态
delayTaskRpcService.batchSave(StudioEmbeddedXcxConvert.toDelayTask(apply, appId));
break; break;
} }
} }
...@@ -356,7 +362,7 @@ public class StudioMerchantServiceImpl implements StudioMerchantService { ...@@ -356,7 +362,7 @@ public class StudioMerchantServiceImpl implements StudioMerchantService {
applies.forEach(apply -> { applies.forEach(apply -> {
log.info("start sync embedded xcx {}, {}", apply.getId(), apply.getAuthorizerAppId()); log.info("start sync embedded xcx {}, {}", apply.getId(), apply.getAuthorizerAppId());
try { try {
this.doComplete(apply.getId(), apply.getAuthorizerAppId()); this.doComplete(apply, apply.getAuthorizerAppId());
} catch (Exception e) { } catch (Exception e) {
log.info("fail sync embedded xcx {}, {}", apply.getId(), apply.getAuthorizerAppId(), e); log.info("fail sync embedded xcx {}, {}", apply.getId(), apply.getAuthorizerAppId(), e);
} }
...@@ -365,6 +371,23 @@ public class StudioMerchantServiceImpl implements StudioMerchantService { ...@@ -365,6 +371,23 @@ public class StudioMerchantServiceImpl implements StudioMerchantService {
} }
@Override @Override
public void syncEmbeddedXcx(Long id) {
StudioEmbeddedXcxApply apply = studioEmbeddedXcxApplyRpService.getById(id).orElse(null);
if (null == apply) {
return;
}
EmbededXcxEnum state = EmbededXcxEnum.valueOf(apply.getState());
if (EmbededXcxEnum.INIT != state) {
return;
}
try {
this.doComplete(apply, apply.getAuthorizerAppId());
} catch (Exception e) {
log.info("sync embedded xcx fail {}, {}", apply.getId(), apply.getAuthorizerAppId(), e);
}
}
@Override
public StudioMerchantApplyVO getApply(Long id) { public StudioMerchantApplyVO getApply(Long id) {
StudioMerchantApply apply = studioMerchantApplyRpService.getById(id) StudioMerchantApply apply = studioMerchantApplyRpService.getById(id)
.orElseThrow(() -> new BizException(FinanceErrorEnums.NOT_EXIST)); .orElseThrow(() -> new BizException(FinanceErrorEnums.NOT_EXIST));
......
package com.jiejing.fitness.finance.service.rpc;
import com.jiejing.common.model.JsonResult;
import com.jiejing.workflow.api.DelayQueueApi;
import com.jiejing.workflow.api.request.BatchSaveDelayTaskRequest;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author chengyubing
* @since 2024/8/15 10:02
*/
@Slf4j
@Service
public class DelayTaskRpcService {
@Resource
private DelayQueueApi delayQueueApi;
public void batchSave(BatchSaveDelayTaskRequest request) {
JsonResult<Void> result = delayQueueApi.batchSave(request);
if (!result.getSuccess()) {
log.error("batch save delay task fail {}", result);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment