Java转码服务 - zhouted/zhouted.github.io GitHub Wiki

记录用java实现转码服务的核心代码。功能包括:把视频转为mp4、把文档转为pdf、生成视频和文档的缩略图等。

ConvertService.java

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import lombok.Synchronized;

@Service
public class ConvertService {
	@Autowired
	ConvertConfig convertConfig;
	@Autowired
	ConvertTaskDao convertTaskDao;
	@Autowired
	ConvertHelper convertHelper;
	@Autowired
	RestTemplate restTemplate;
	
	private static Thread convertThread = null;
	private static Thread notifyThread = null;
	private static ConcurrentLinkedQueue<ConvertTask> queueTasks = new ConcurrentLinkedQueue<>();
	private static ConcurrentHashMap<String, ConvertTask> convertingTasks = new ConcurrentHashMap<>();
	private static ConcurrentLinkedQueue<ConvertTask> notifyTasks = new ConcurrentLinkedQueue<>();
	
	@Synchronized
	public ConvertTask addTask(ConvertTask task) {
		ConvertTask save = convertTaskDao.save(task);
		queueTasks.add(save);
		startConvert();
		return task;
	}
	
	@Synchronized
	private int doConvert() {
		if (convertingTasks.size() >= 4) {
			return 1;
		}
		ConvertTask task = queueTasks.poll();
		if (task == null) {
			return 10;
		}
		try {
			if (convertingTasks.containsKey(task.getSrc())) {//重复
				task.setStatus(ConvertTask.STATUS_IGNORE);
				convertTaskDao.save(task);
				return 0;
			}
			convertingTasks.put(task.getSrc(), task);
			task.setStatus(ConvertTask.STATUS_START);
			task.setStartAt(Instant.now());
			convertTaskDao.save(task);
			
			CompletableFuture<ConvertTask> process = convertHelper.process(task);
			process.thenAccept((retTask) -> {
				retTask.setStatus(ConvertTask.STATUS_FINISH);
				retTask.setEndAt(Instant.now());
				convertTaskDao.save(retTask);
				convertingTasks.remove(task.getSrc());
				notifyTasks.add(retTask);
				startNotify();
				System.out.println(retTask);
			});
		} catch(Exception e) {
			e.printStackTrace();
		}
		return 0;
	}
	
	@Synchronized
	private int doNotify() {
		ConvertTask task = notifyTasks.poll();
		if (task == null) {
			return 10;
		}
		int sleep = 0;
		try {
			ResponseEntity<String> response = restTemplate.postForEntity(convertConfig.getNotify(), task, String.class);
			String success = response.getBody();
			System.out.println(success);
			if (!StringUtils.isNullOrEmpty(success) && success.contains("success")) {
				task.setStatus(ConvertTask.STATUS_NOTIFIED);
				task.setNotify(LocalDateTime.now()+","+success);
			} else {
				task.setNotify(LocalDateTime.now()+",empty");
				notifyTasks.add(task);
				sleep = 1;
			}
		} catch(Exception e) {
			e.printStackTrace();
			task.setNotify(LocalDateTime.now()+","+e.getMessage());
			notifyTasks.add(task);
			sleep = 2;
		}
		convertTaskDao.save(task);
		return sleep;
	}
	
	public void startConvert() {
		if (convertThread != null) {
	        convertThread.interrupt();
			return;
		}
		convertThread = new Thread(() -> {
			while (convertThread == Thread.currentThread()) {
				System.out.println("Convert "+convertThread + " running at " + LocalDateTime.now());
				int sleep = doConvert();
				try {
					Thread.sleep(sleep*1000);
				} catch (InterruptedException e) {
					System.out.println("Convert "+Thread.currentThread() +" "+ e.getMessage());
				}
			}
			System.out.println("Convert "+Thread.currentThread()+" stoped at " + LocalDateTime.now());
		});
		convertThread.start();
		System.out.println("Convert "+convertThread+" started at " + LocalDateTime.now());
		
	}
	
	public void startNotify() {
		if (notifyThread != null) {
			notifyThread.interrupt();
			return;
		}
		notifyThread = new Thread(() -> {
			while (notifyThread == Thread.currentThread()) {
				System.out.println("Notify "+notifyThread + " running at " + LocalDateTime.now());
				int sleep = doNotify();
				try {
					Thread.sleep(sleep*1000);
				} catch (InterruptedException e) {
					System.out.println("Notify "+Thread.currentThread() +" "+ e.getMessage());
				}
			}
			System.out.println("Notify "+Thread.currentThread()+" stoped at " + LocalDateTime.now());
		});
		notifyThread.start();
		System.out.println("Notify "+notifyThread+" started at " + LocalDateTime.now());
	}
	
	@PostConstruct
	private void init() {
		List<ConvertTask> tasks = convertTaskDao.findByStatus(ConvertTask.STATUS_START);
		tasks.forEach(task -> {
			queueTasks.add(task);
		});
		tasks = convertTaskDao.findByStatus(ConvertTask.STATUS_INITIAL);
		tasks.forEach(task -> {
			queueTasks.add(task);
		});
		tasks = convertTaskDao.findByStatus(ConvertTask.STATUS_FINISH);
		tasks.forEach(task -> {
			notifyTasks.add(task);
		});
		startConvert();
		startNotify();
		System.out.println(convertConfig);
	}
	
	@PreDestroy
	public void stop() {
		if (convertThread != null) {
	        convertThread.interrupt();
	        convertThread = null;
		}
		if (notifyThread != null) {
			notifyThread.interrupt();
			notifyThread = null;
		}
	}
}

ConvertTaskDao.java

import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;

public interface ConvertTaskDao  extends JpaRepository<ConvertTask, Integer>{
	List<ConvertTask> findByStatus(@Param("status") Integer status);
}

ConvertTask实体类

public class ConvertTask implements Serializable{
	private static final long serialVersionUID = 1418649194096365375L;
    //状态常量
	public static final Integer STATUS_IGNORE = -1;
	public static final Integer STATUS_INITIAL = 0;
	public static final Integer STATUS_START = 1;
	public static final Integer STATUS_FINISH = 2;
	public static final Integer STATUS_NOTIFIED = 3;
	
	@Id
	Integer id;
	String name;
	Instant createdAt;
	Instant startAt;
	Instant endAt;
	Integer status = STATUS_INITIAL;
	String src;//源文件
	String out;//输出文件
	String img;//缩略图
	String ret;//转换结果
	String notify;//通知结果
}

ConvertHelper.java

基于CmdHelper.java实现,见:https://www.zybuluo.com/TedZhou/note/1716823

import java.awt.image.BufferedImage;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;

import javax.imageio.ImageIO;

import org.apache.commons.exec.CommandLine;
import org.apache.pdfbox.io.IOUtils;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.rendering.PDFRenderer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ConvertHelper {
	static final String EXT_PDF = ".pdf";
	static final String EXT_MP4 = ".mp4";
	static final String EXT_AVI = ".avi";
	static final String EXT_DOC_STR = ".doc,.docx,.ppt,.pptx,.xls,.xlsx,";// 文档后缀
	static final String EXT_VIDEO_FFMPEG_STR = ".avi,.mpg,.wmv,.3gp,.mov,.asf,.asx,.flv,.f4v,.vob,.mkv,.ts,";// ffmpeg能解码的文件格式
	static final String EXT_VIDEO_MENCODER_STR = ".wmv9,.rm,.rmvb,.mpeg,";// 需要先mencoder解码的文件格式。.mpeg无法直接截取图片,所以需要中转

	@Autowired
	ConvertConfig convertConfig;
	
	public static String getExt(String filename) {
		if (StringUtils.isNullOrEmpty(filename)) return "";
		int pos = filename.lastIndexOf(".");
		if (pos < 0) return "";
		return filename.substring(pos).toLowerCase();
	}
	
	public static boolean isDoc(String ext) {
		return EXT_DOC_STR.contains(ext + ',');
	}

	public static boolean isVideo(String ext) {
		return EXT_VIDEO_FFMPEG_STR.contains(ext + ',') || needMEncoder(ext);
	}

	public static boolean needMEncoder(String ext) {
		return EXT_VIDEO_MENCODER_STR.contains(ext + ',');
	}

	public CompletableFuture<ConvertTask> process(ConvertTask task) {
		CompletableFuture<ConvertTask> cf = new CompletableFuture<>();
		new Thread(() -> {
			String srcFile = task.getSrc();
			String dotExt = getExt(srcFile);
			String outFile = null;
			CmdHandler cmdHandler = null;
			FileOutputStream logStream = null;
			try {
				File logFile = new File(convertConfig.getLog() + srcFile + ".log");
				logStream = new FileOutputStream(logFile);
				if (isDoc(dotExt)) {
					outFile = srcFile + EXT_PDF;
					cmdHandler = doc2pdf(convertConfig.getSrc() + srcFile, convertConfig.getOut() + outFile, logStream);
					if (cmdHandler.getException() == null) {
						if (pdf2png(convertConfig.getOut() + outFile, convertConfig.getOut() + task.getSrc() + ".png")) {
							task.setImg(task.getSrc() + ".png");
						}
					}
				} else if (isVideo(dotExt)) {
					if (needMEncoder(dotExt)) {
						outFile = srcFile + EXT_AVI;
						cmdHandler = video2avi(convertConfig.getSrc() + srcFile, convertConfig.getOut() + outFile, logStream);
						task.setRet(cmdHandler.resultString());
						if (cmdHandler.getException() != null) {
							cf.complete(task);
							return;
						}
						srcFile = outFile;
					}
					cmdHandler = video2jpg(convertConfig.getSrc() + srcFile, convertConfig.getOut() + task.getSrc() + ".jpg", logStream);
					if (cmdHandler.getException() == null) {
						task.setImg(task.getSrc() + ".jpg");
					}
					outFile = task.getSrc() + EXT_MP4;
					cmdHandler = video2mp4(convertConfig.getSrc() + srcFile, convertConfig.getOut() + outFile, logStream);
				} else {
					task.setRet("-1:unsupport");
					cf.complete(task);
					return;
				}
			} catch (FileNotFoundException e) {
				e.printStackTrace();
			} finally {
				IOUtils.closeQuietly(logStream);
			}

			task.setOut(outFile);
			task.setRet(cmdHandler.resultString());
			if (cmdHandler.getException() != null) {
				task.setOut(null);
			}
			cf.complete(task);
		}).start();
		return cf;
	}

	CmdHandler doc2pdf(String srcFile, String outFile, OutputStream cmdOut) {
		CommandLine cmd = new CommandLine(convertConfig.getUnoconv());
		cmd.addArgument("-f");
		cmd.addArgument("pdf");
		cmd.addArgument("-o");
		cmd.addArgument(outFile);
		cmd.addArgument(srcFile);
		return CmdHelper.run(cmd, cmdOut, 0);
	}

	boolean pdf2png(String srcFile, String outFile) {
		try {
			File file = new File(srcFile);
			PDDocument doc;
			doc = PDDocument.load(file);
			PDFRenderer renderer = new PDFRenderer(doc);
			BufferedImage image = renderer.renderImageWithDPI(1, 144); // Windows native DPI
			BufferedImage tag = new BufferedImage(64, 64, BufferedImage.TYPE_INT_RGB);
			tag.getGraphics().drawImage(image, 0, 0, 64, 64, null);
			ImageIO.write(tag, "PNG", new File(outFile));
			return true;
		} catch (IOException e) {
			e.printStackTrace();
			return false;
		}
	}

	CmdHandler video2mp4(String srcFile, String outFile, OutputStream cmdOut) {
		CommandLine cmd = new CommandLine(convertConfig.getFfmpeg());
		cmd.addArgument("-i");
		cmd.addArgument(srcFile);
		cmd.addArgument("-ar");
		cmd.addArgument("22050");
		cmd.addArgument("-vcodec");
		cmd.addArgument("libx264");
		cmd.addArgument("-q:v");
		cmd.addArgument("6");
		cmd.addArgument("-r");
		cmd.addArgument("25");
		cmd.addArgument("-flags");
		cmd.addArgument("+loop");
		cmd.addArgument("-crf");
		cmd.addArgument("24");
		cmd.addArgument("-bt");
		cmd.addArgument("256k");
		cmd.addArgument("-af");
		cmd.addArgument("volume=2");
		cmd.addArgument("-y");
		cmd.addArgument(outFile);
		return CmdHelper.run(cmd, cmdOut, 0);
	}

	CmdHandler video2jpg(String srcFile, String outFile, OutputStream cmdOut) {
		CommandLine cmd = new CommandLine(convertConfig.getFfmpeg());
		cmd.addArgument("-i");
		cmd.addArgument(srcFile);
		cmd.addArgument("-f");
		cmd.addArgument("image2");
		cmd.addArgument("-ss");
		cmd.addArgument("15");
		cmd.addArgument("-t");
		cmd.addArgument("0.001");
		cmd.addArgument("-s");
		cmd.addArgument("64x64");
		cmd.addArgument("-y");
		cmd.addArgument(outFile);
		return CmdHelper.run(cmd, cmdOut, 0);
	}

	CmdHandler video2avi(String srcFile, String outFile, OutputStream cmdOut) {
		CommandLine cmd = new CommandLine(convertConfig.getMencoder());
		cmd.addArgument(srcFile);
		cmd.addArgument("-oac");
		cmd.addArgument("mp3lame");
		cmd.addArgument("-lameopts");
		cmd.addArgument("preset=64");
		cmd.addArgument("-ovc");
		cmd.addArgument("xvid");
		cmd.addArgument("-xvidencopts");
		cmd.addArgument("bitrate=600");
		cmd.addArgument("-of");
		cmd.addArgument("avi");
		cmd.addArgument("-o");
		cmd.addArgument(outFile);
		return CmdHelper.run(cmd, cmdOut, 0);
	}
}

配置项

convert.src=/mnt/file/upload/
convert.out=/mnt/file/download/
convert.log=/mnt/file/download/
convert.ffmpeg=/opt/ffmpeg/ffmpeg
convert.unoconv=/opt/libreoffice6.4/unoconv.py
convert.mencoder=/usr/bin/mencoder
convert.notify=http://127.0.0.1:8080/api/convert/notify
@Data
@Configuration
@ConfigurationProperties(prefix = "convert")
public class ConvertConfig {
	String src;
	String out;
	String log;
	String ffmpeg;
	String unoconv;
	String mencoder;
	String notify;
}

依赖

pom.xml

<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-exec</artifactId>
	<version>1.3</version>
</dependency>
<dependency>
	<groupId>org.apache.pdfbox</groupId>
	<artifactId>pdfbox</artifactId>
	<version>2.0.9</version>
</dependency>
<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
</dependency>

ffmpeg && mencoder

wget https://download1.rpmfusion.org/free/el/rpmfusion-free-release-7.noarch.rpm
yum install epel-release
rpm -Uvh rpmfusion-free-release-7.noarch.rpm
wget https://johnvansickle.com/ffmpeg/builds/ffmpeg-git-amd64-static.tar.xz
yum install mencoder

libreoffice && unoconv

https://www.libreoffice.org/donate/dl/rpm-x86_64/6.4.4/zh-CN/LibreOffice_6.4.4_Linux_x86-64_rpm.tar.gz
https://codeload.github.com/unoconv/unoconv/zip/0.8.2
⚠️ **GitHub.com Fallback** ⚠️