【Azure 媒体服务】记使用 Media Service 的官网示例代码 Audio Analyzer 出现卡顿在 Creating event processor host .. 直到 Timeout 问题 - LuBu0505/My-Code GitHub Wiki
在使用Azure Media Service的官网示例 (media-services-v3-java --> AudioAnalytics --> **AudioAnalyzer **)代码的过程中,根据配置添加了 Event Hub 和Storage Account,使用 Event Grid 来获取获取Job的运行状态。
Analyze a media file with a audio analyzer preset
Please note, there are costs for using Event Hub. For more details, refer Event Hubs pricing and FAQ
Enable Event Grid resource provider
az provider register --namespace Microsoft.EventGrid
To check if registered, run the next command. You should see "Registered"
az provider show --namespace Microsoft.EventGrid --query "registrationState"
Create an Event Hub
namespace=<unique-namespace-name>
hubname=<event-hub-name>
az eventhubs namespace create --name $namespace --resource-group <resource-group>
az eventhubs eventhub create --name $hubname --namespace-name $namespace --resource-group <resource-group>
Subscribe to Media Services events
hubid=$(az eventhubs eventhub show --name $hubname --namespace-name $namespace --resource-group <resource-group> --query id --output tsv)
amsResourceId=$(az ams account show --name <ams-account> --resource-group <resource-group> --query id --output tsv)
az eventgrid event-subscription create --resource-id $amsResourceId --name <event-subscription-name> --endpoint-type eventhub --endpoint $hubid
Create a storage account and container for Event Processor Host if you don't have one Create a storage account for Event Processor Host
Update appsettings.json with your Event Hub and Storage information StorageAccountName: The name of your storage account. StorageAccountKey: The access key for your storage account. Navigate to Azure portal, "All resources", search your storage account, then "Access keys", copy key1. StorageContainerName: The name of your container. Click Blobs in your storage account, find you container and copy the name. EventHubConnectionString: The Event Hub connection string. search your namespace you just created. -> Shared access policies -> RootManageSharedAccessKey -> Connection string-primary key. EventHubName: The Event Hub name. -> Event Hubs.
但根据文档配置完成后,运行代码,出现长时间卡顿。根据日志输出,卡顿在 “Creating an event processor host to process events from Event Hub...:” 直到Timeout为止。
Creating a transform...
Transform created
Creating an input asset...
Uploading a media file to the asset...
Creating a job...
Creating an event processor host to process events from Event Hub...:2022-10-02T12:09:05.694 Timeout happened.
Job final state received, unregistering event processor...
Job elapsed time: 1800 second(s).
Job finished.
这是为什么呢?
怎么解决卡顿问题呢?
因为上面的代码使用了Azure Event Hub Hub,所以需要了解客户端是如何从 Event Hub中获取到数据。
简单来讲,Event Hub作为一个中转的消息中心,需要用户自动的发送,接收消息。
本例中,通过Event Grid订阅了Media Service Job的输出内容并通过服务自动发送到Event Hub中。所以在 AudioAnalyzer 代码中,我们只处理了接收消息。
AudioAnalyzer.java 中声明了封装好的 MediaServicesEventProcessor对象。
// Create a event processor host to process events from Event Hub.
Object monitor = new Object();
eventProcessorHost = new MediaServicesEventProcessor(jobName, monitor, null,
config.getEventHubConnectionString(), config.getEventHubName(),
container); // Define a task to wait for the job to finish.
Callable<String> jobTask = () -> { synchronized (monitor) {
monitor.wait();
} return "Job";
};
MediaServicesEventProcessor.java 中初始化 Event process Host对象。使用的Azure官方 com.azure.messaging.eventhubs.EventProcessorClient 包
public MediaServicesEventProcessor(String jobName, Object monitor, String liveEventName,
String eventHubConnectionString, String eventHubName,
BlobContainerAsyncClient container) { this.eventHubConnectionString = eventHubConnectionString; this.eventHubName = eventHubName; this.blobContainer = container; if (jobName != null) { this.jobName = jobName.replaceAll("-", "");
} else { this.jobName = null;
} this.monitor = buildEventProcessClient();
monitor = this.monitor; if (liveEventName != null) { this.liveEventName = liveEventName.replaceAll("-", "");
} else { this.liveEventName = null;
}
}
... private EventProcessorClient buildEventProcessClient() { return new EventProcessorClientBuilder()
.connectionString(this.eventHubConnectionString, this.eventHubName)
.checkpointStore(new BlobCheckpointStore(this.blobContainer))
.consumerGroup("$Default")
.processEvent(eventContext -> this.processEvent(eventContext))
.processError(errorContext -> System.out.println("Partition "
+ errorContext.getPartitionContext().getPartitionId() + " onError: " + errorContext.getThrowable().toString()))
.processPartitionInitialization(initializationContextConsumer -> System.out.println("Partition "
+ initializationContextConsumer.getPartitionContext().getPartitionId() + " is opening"))
.processPartitionClose(closeContext -> System.out.println("Partition "
+ closeContext.getPartitionContext().getPartitionId() + " is closing for reason " + closeContext.getCloseReason().toString()))
.buildEventProcessorClient();
}
但是,对比Event Hub接收消息的示例代码,却发现缺少了最关键的 start 方法
System.out.println("Starting event processor");
eventProcessorClient.start();
因为Event Processor Client对象并没有启动,所以代码从Event Hub中根本不能接收消息,直到设定的Timeout时间(30分钟)到了为止。 这就是程序出现长时间卡顿的根源。
解决办法很简单,在**MediaServicesEventProcessor.java **中添加 start 方法。并在 AudioAnalyzer.java 中调用
1: 在 **MediaServicesEventProcessor.java **中添加 start
2: 在 AudioAnalyzer.java 中调用 start
修改完成后,重新启动程序,即可从Event Hub中获取到当前Job的状态
全部示例代码参考: https://github.com/LuBu0505/media-services-v3-java/tree/main/AudioAnalytics/AudioAnalyzer
使用 Java 向/从 Azure 事件中心 (azure-messaging-eventhubs) 发送/接收事件: https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send#receive-events
当在复杂的环境中面临问题,格物之道需:浊而静之徐清,安以动之徐生。 云中,恰是如此!
分类: 【Azure 媒体服务】, 【Azure 环境】, 【Azure Developer】
标签: Azure Developer, Azure 媒体服务, Media-services-v3-java-AudioAnalyzer, MediaServicesEventProcessor.java, AudioAnalyzer.java, Creating an event processor host to process ...