TA的每日心情 | 开心 2021-3-12 23:18 |
---|
签到天数: 2 天 [LV.1]初来乍到
|
对远远大于内存的数据进行外排序,在多路比较的时候用败者树效率会更高。
package my.sort;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
/**
* 基于大数据量的外排序算法,分为二路贵宾和多路归并
* @author java2king
* @link http://blog.csdn.net/Java2King
*
*/
public class ExternalSort {
public static int ITEM_COUNT = 10000000; //总数
public static int BUFFER_SIZE = 1024*4*1000;// 一次缓冲读取
public static int FILE_COUNT = 1024*1000*1*4;// 每个文件的记录数1
public static File MAIN_FILE = new File("mainset");//要排序的文件
/**
* 二路归并
* @param file
* @return
* @throws IOException
*/
public File sort(File file) throws IOException {
ArrayList<File> files = split(file);
return process(files);
}
/**
* 多路归并
* @param file
* @throws IOException
*/
public void mSort(File file) throws IOException{
ArrayList<File> files = split(file);
multipleMerge(files);
}
// recursive method to merge the lists until we are left with a
// single merged list
private File process(ArrayList<File> list) throws IOException {
if (list.size() == 1) {
return list.get(0);
}
ArrayList<File> inter = new ArrayList<File>();
for (Iterator<File> itr = list.iterator(); itr.hasNext();) {
File one = itr.next();
if (itr.hasNext()) {
File two = itr.next();
inter.add(merge(one, two));
} else {
// return one;
inter.add(one);
}
}
return process(inter);
}
/**
* Splits the original file into a number of sub files.
*/
private ArrayList<File> split(File file) throws IOException {
ArrayList<File> files = new ArrayList<File>();
int[] buffer = new int[FILE_COUNT];
FileInputStream fr = new FileInputStream(file);
BufferedInputStream bin = new BufferedInputStream(fr,BUFFER_SIZE);
DataInputStream din=new DataInputStream(bin);
boolean fileComplete = false;
while (!fileComplete) {
int index = buffer.length;
for (int i = 0; i < buffer.length && !fileComplete; i++) {
try {
buffer = din.readInt();
} catch (Exception e) {
fileComplete = true;
index = i;
}
}
if (index != 0 && buffer[0] > -1) {
Arrays.sort(buffer, 0, index);
File f = new File("set" + new Random().nextInt());
// File temp = File.createTempFile("josp", ".tmp", f);
FileOutputStream writer = new FileOutputStream(f);
BufferedOutputStream bOutputStream = new BufferedOutputStream(writer);
DataOutputStream dout=new DataOutputStream(bOutputStream);
for (int j = 0; j < index; j++) {
dout.writeInt(buffer[j]);
}
dout.close();
bOutputStream.close();
writer.close();
files.add(f);
}
}
din.close();
bin.close();
fr.close();
return files;
}
/**
* 多路归并
* @param list
* @throws IOException
*/
private void multipleMerge(ArrayList<File> list) throws IOException {
int fileSize = list.size();
if(fileSize == 1){
return;
}
ArrayList<DataInputStream> dinlist = new ArrayList<DataInputStream>();
int[] ext = new int[fileSize];//比较数组
// File output = new File("multipleMerged");
FileOutputStream os = new FileOutputStream(MAIN_FILE);
BufferedOutputStream bout = new BufferedOutputStream(os);
DataOutputStream dout = new DataOutputStream(bout);
for (int i = 0; i < fileSize; i++) {
try {
dinlist.add(i, new DataInputStream(new BufferedInputStream(
new FileInputStream(list.get(i)), BUFFER_SIZE)));
} catch (Exception e) {
e.printStackTrace();
}
}
int index = 0;
for (int i = 0; i < fileSize; i++) {
try {
ext = dinlist.get(i).readInt();
} catch (Exception e) {
System.err.println("file_" + i + "为空");
ext = -1;
}
}
int count = fileSize;
int[] sum = new int[fileSize];
while (count > 1) {
index = getMinIndex(ext);
dout.writeInt(ext[index]);
sum[index]++;
try {
ext[index] = dinlist.get(index).readInt();
} catch (Exception e) {
ext[index] = -1;
count--;
dinlist.get(index).close();
// System.err.println(index + "空,写进:" +sum[index]);
}
}
int sIndex = getSIndex(ext);
dout.writeInt(ext[sIndex]);
while (true) {
try {
dout.writeInt(dinlist.get(sIndex).readInt());
} catch (Exception e) {
dinlist.get(sIndex).close();
break;
}
}
dout.close();
}
//找到剩下的最后一个文件输入流
public int getSIndex(int[] ext){
int result = 0;
for (int i = 0; i < ext.length; i++) {
if(ext!= -1){
result = i;
break;
}
}
return result;
}
//找到数据中最小的一个
public int getMinIndex(int[] ext){
int min = 2147483647;
int index = -1;
for (int i = 0; i < ext.length; i++) {
if(ext != -1 && ext < min){
min = ext;
index = i;
}
}
return index;
}
/**
* 二路归并
*
* @param one
* @param two
* @return
* @throws IOException
*/
private File merge(File one, File two) throws IOException {
FileInputStream fis1 = new FileInputStream(one);
FileInputStream fis2 = new FileInputStream(two);
BufferedInputStream bin1 = new BufferedInputStream(fis1,BUFFER_SIZE);
BufferedInputStream bin2 = new BufferedInputStream(fis2,BUFFER_SIZE);
DataInputStream din1=new DataInputStream(bin1);
DataInputStream din2=new DataInputStream(bin2);
File output = new File("merged" + new Random().nextInt());
FileOutputStream os = new FileOutputStream(output);
BufferedOutputStream bout = new BufferedOutputStream(os);
DataOutputStream dout=new DataOutputStream(bout);
int a = -1;//= din1.readInt();
int b = -1;//= din2.readInt();
boolean finished = false;
boolean emptyA = false;//
int flag = 0;
while (!finished) {
if (flag != 1) {
try {
a = din1.readInt();
} catch (Exception e) {
emptyA = true;
break;
}
}
if (flag != 2) {
try {
b = din2.readInt();
} catch (Exception e) {
emptyA = false;
break;
}
}
if(a > b){
dout.writeInt(b);
flag = 1;
}else if( a < b){
dout.writeInt(a);
flag = 2;
}else if(a == b){
dout.write(a);
dout.write(b);
flag = 0;
}
}
finished = false;
if(emptyA){
dout.writeInt(b);
while(!finished){
try {
b = din2.readInt();
} catch (Exception e) {
break;
}
dout.writeInt(b);
}
}else{
dout.writeInt(a);
while(!finished){
try {
a = din1.readInt();
} catch (Exception e) {
break;
}
dout.writeInt(a);
}
}
dout.close();
os.close();
bin1.close();
bin2.close();
bout.close();
return output;
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
Random random = new Random(System.currentTimeMillis());
FileOutputStream fw = new FileOutputStream(MAIN_FILE);
BufferedOutputStream bout = new BufferedOutputStream(fw);
DataOutputStream dout=new DataOutputStream(bout);
for (int i = 0; i < ITEM_COUNT; i++) {
int ger = random.nextInt();
ger = ger < 0 ? -ger : ger;
dout.writeInt(ger);
}
dout.close();
bout.close();
fw.close();
ExternalSort sort = new ExternalSort();
System.out.println("Original:");
long start = System.currentTimeMillis();
sort.mSort(MAIN_FILE);
long end = System.currentTimeMillis();
System.out.println((end - start)/1000 + "s");
recordFile((end - start)/1000 ,true);
}
private static void recordFile(long time,boolean isBuffer)
throws FileNotFoundException, IOException {
BufferedWriter bw = new BufferedWriter(new FileWriter("log",true));
bw.write("FILE_COUNT = "+FILE_COUNT+";对"+ ITEM_COUNT + "条数据 "+ ITEM_COUNT*4/(1024*1204) +"MB排序耗时:" + time + "s ");
if(isBuffer){
bw.write(" 使用缓冲:"+BUFFER_SIZE*4/(1024*1204) +"MB");
}
bw.newLine();
bw.close();
}
}
源码下载:http://file.javaxxz.com/2014/11/11/000319453.zip |
|