一种埋点存储方案

埋点存储的选择

Posted by River on January 11, 2022

前言

回首Android基础技术,经历了一些有意思的项目在此记录下。

背景

1、为了配合ProtoBuffer的数据格式,开发了一套存取byte数组的日志组件。
2、这套日志组件用了mmap的存储映射技术,组件名为Hlog。
3、优点在于跟通常的直接用系统数据库相比,避免了数据库读写分离带来的一些异常;同时性能要优于数据库。

Hlog核心实现存储篇

上层调用


// POOL_ID其实就是定义一个自己的文件目录
HLogger.getLogger(POOL_ID).appendLog(logData, 0, logData.length);

appendLog方法实现

// pool其实就是个文件夹目录
public synchronized void appendLog(byte[] buffer, int offset, int count){

    HLogRecord record = new HLogRecord(buffer, offset, count);
    
    if(需要创建){
        currentLogFile = new HLogFile(pool,
                        Math.max(HLogFile.SIZE_OF_HEADER + record.getTotalLength(), HLogConstants.LOG_FILE_SIZE_LIMIT));
    }
    
    currentLogFile.appendRecord(record);
}

// 辅助分析之HLogRecord

public class HLogRecord{
    
    public HLogRecord(byte[] buffer, int offset, int length) {
        this.buffer = buffer;
        this.offset = offset;
        this.length = length;
        this.compress = HLogConstants.LOG_COMPRESS_NONE;
        this.encrypt = HLogConstants.LOG_ENCRYPT_NONE;

        initByteBuffer();
    }
    
    private void initByteBuffer() {
        byteBuffer = ByteBuffer.allocate(SIZE_OF_HEADER + length);
        byteBuffer.putLong(HEADER_MAGIC);
        byteBuffer.putInt(getFlags());
        byteBuffer.putInt(length);
        byteBuffer.put(buffer, offset, length);
    }
    
    public byte[] getBytes() {
        return byteBuffer.array();
    }
}

辅助分析之核心实现HLogFile

public class HLogFile{

   public HLogFile(File pool, int length) throws IOException {
        // 文件名
        String name = null;
        do {
            name = String.format(Locale.getDefault(),
                    "%s/%016x.hl", pool.getAbsolutePath(),
                    System.currentTimeMillis() * 1000 + random.nextInt(1000));
        } while (new File(name).exists());
        initInternal(name, length);
    }
    
    private void initInternal(String name, int length) throws IOException{
        boolean newFile = initFile(name, length);

        this.name = name.substring(name.lastIndexOf('/') + 1);
        maxLength = length;

        file = new RandomAccessFile(name, "rw");
        fileChannel = file.getChannel();
        // 这里建立映射
        logBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, length);

        isClosed = false;

        if (newFile) {
            initHeader();
        }

    }
    
    // 创建一个大小为length的空文件
    private boolean initFile(String name, int length) throws IOException {
        File file = new File(name);
        boolean exists = file.exists();
        if (!exists) {
            file.createNewFile();

            FileOutputStream fos = new FileOutputStream(file);
            ByteBuffer buffer = ByteBuffer.allocate(length);
            fos.getChannel().write(buffer);
            fos.close();
        }
        return !exists;
    }
    
    //把header的数据存入buffer
    private void initHeader() {
        if (logBuffer != null) {
            logBuffer.clear();
            // 读取这里数据的索引是0
            logBuffer.putLong(HEADER_MAGIC);
            // 读取这里数据的索引是8
            logBuffer.putInt(HEADER_VERSION);
            // 读取这里数据的索引是12
            logBuffer.putInt(HEADER_FLAGS);
            // 读取这里数据的索引是16
            logBuffer.putInt(0);
            // 读取这里数据的索引是20
            logBuffer.putInt(0);
            logBuffer.force();
        }
    }
    
    //实际写入数据
    public void appendRecord(HLogRecord record) throws BufferOverflowException {
        if (logBuffer == null) {
            return;
        }

        int totalLength = getTotalLength();
        if (totalLength < 0) {
            return;
        }

        byte[] bytes = record.getBytes();
        logBuffer.position(totalLength);
        logBuffer.put(bytes);

        int offset = SIZE_OF_HEADER - SIZE_OF_HEADER_RECORD_COUNT - SIZE_OF_HEADER_LENGTH;
        // logBuffer.getInt(offset)的初始值是0,每次加1
        logBuffer.putInt(offset, logBuffer.getInt(offset) + 1);
        // logBuffer.getInt(offset)初始值是0,每次加bytes.length
        offset = SIZE_OF_HEADER - SIZE_OF_HEADER_LENGTH;
        logBuffer.putInt(offset, logBuffer.getInt(offset) + bytes.length);

        logBuffer.force();
    }
    // SIZE_OF_HEADER - SIZE_OF_HEADER_LENGTH这个的索引是20,结合initHeader可以看懂
    public int getContentLength() {
        return logBuffer == null ? 0 : logBuffer.getInt(SIZE_OF_HEADER - SIZE_OF_HEADER_LENGTH);
    }
    // header描述加byte数据的数据量
    public int getTotalLength() {
        int contentLength = getContentLength();
        return contentLength == 0 ? SIZE_OF_HEADER : SIZE_OF_HEADER + contentLength;
    }
}

上传数据前断开IO,释放mmap


            try {
                if (fileChannel != null) {
                    unmapFile();
                    fileChannel.close();
                    fileChannel = null;
                }
                if (file != null) {//  file是RandomAccessFile
                    file.close();
                    file = null;
                }
            } catch (IOException e) {// fileChannel是FileChannel
                //e.printStackTrace();
                HiLogger.w(HLogConstants.LOG_TAG, "Cannot close file");
            }
            
            private void unmapFile() {
                if (logBuffer != null) {
                    try {
                    Method method = fileChannel.getClass()
                        .getDeclaredMethod("unmap", MappedByteBuffer.class);
                    if (method != null) {
                    method.setAccessible(true);
                    method.invoke(fileChannel.getClass(), logBuffer);
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    logBuffer = null;
                    HiLogger.w(HLogConstants.LOG_TAG, "Cannot invoke FileChannel::unmap()");
                }

                logBuffer = null;
        }
    }

读取数据上传篇

上层调用

// 获取HLogBundle数组
final List<HLogBundle> logBundles = HLogger.getLogger(POOL_ID).getLogBundles();
         
//  转化成一个大的byte[]的集合
ArrayList<byte[]> logsBytes = new ArrayList<>();

        for (int i = 0; i < mLogBundles.size(); i++) {
                final List<byte[]> bytes = mLogBundles.get(i).getLogs();
                if (bytes!=null){
                    logsBytes.addAll(bytes);
                }
            }
        
// 通过ProtoBuffer编译期生成的类,转化成一个超级byte[]
final JYUbtPb.DataRequest.Builder builder = JYUbtPb.DataRequest.newBuilder();
        for (int j = 0; j < logBytes.size(); j++) {
            try {
                builder
                        .addInfo(JYUbtPb.LoggerModel.parseFrom(logBytes.get(j)));
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
        
        final JYUbtPb.DataRequest dataRequest = builder.build();
        return dataRequest.toByteArray();
            
// 通过okhttp上传
private final MediaType MEDIA_TYPE_PB = MediaType.parse("application/protobuf");

final Request request = new Request.Builder()
                    .url(serverUrl)
                    .post(RequestBody.create(MEDIA_TYPE_PB, data))
                    .header("connection", "keep-alive")
                    .header("Content-Type", "application/protobuf")
                    .header("Accept-Charset", "utf-8")
                    .header("Content-Encoding", "gzip")
                    .header("Is-Content-Encoded", "true")
                    .build();
                    
 getOkHttpClient().newCall(request)
                    .enqueue(new Callback()

核心实现


public class HLogFile {

    public List<byte[]> getRecords() {
        ArrayList<byte[]> records = new ArrayList<>();
        try {
            if (logBuffer != null) {
                // 写入byte[]的条数
                int recordCount = getRecordCount();
                // 写入byte[]开始的地方
                logBuffer.position(SIZE_OF_HEADER);
                // 健壮性判断
                for (int i = 0; i < recordCount; i++) {
                    if (logBuffer.remaining() < HLogRecord.SIZE_OF_HEADER) {
//                      throw new RuntimeException("The log file is corrupted."); 
                        break;
                    }
                // 健壮性判断
                    if (logBuffer.getLong() != HLogRecord.HEADER_MAGIC) {
//                       throw new RuntimeException("The log file is corrupted.");
                        break;
                    }
                    
                    logBuffer.getInt();//这里就是HLogRecord记录的flag
                    int recordLength = logBuffer.getInt();
                    if (logBuffer.remaining() < recordLength) {
//                throw new RuntimeException("The log file is corrupted.");
                        break;
                    }

                    byte[] bytes = new byte[recordLength];
                    logBuffer.get(bytes);
                    records.add(bytes);
                }
            }

            return records;
        }catch (Exception e) {
            logBuffer = null;
            records.clear();
            return new ArrayList<>();
        }
    }
}

补充细节

  private synchronized Call.Factory getOkHttpClient() {
            if (this.okHttpClient == null){
                this.okHttpClient = new OkHttpClient.Builder()
                        .addInterceptor(new GzipRequestInterceptor())
                        .readTimeout(10, TimeUnit.SECONDS)
                        .writeTimeout(10, TimeUnit.SECONDS)
                        .connectTimeout(10, TimeUnit.SECONDS)
                        .build();
            }
            return this.okHttpClient;
  }
  
  static class GzipRequestInterceptor implements Interceptor {

        @Override
        public Response intercept(Chain chain) throws IOException {
            Request originalRequest = chain.request();
            if (originalRequest.body() == null) {
                return chain.proceed(originalRequest);
            }

            Request compressedRequest = originalRequest.newBuilder()
                    .method(originalRequest.method(), gzip(originalRequest.body()))
                    .build();
            return chain.proceed(compressedRequest);
        }

        private RequestBody gzip(final RequestBody body) {
            return new RequestBody() {
                @Override
                public MediaType contentType() {
                    return body.contentType();
                }

                @Override
                public long contentLength() {
                    return -1; // 无法知道压缩后的数据大小
                }

                @Override
                public void writeTo(BufferedSink sink) throws IOException {
                    BufferedSink gzipSink = Okio.buffer(new GzipSink(sink));
                    body.writeTo(gzipSink);
                    gzipSink.close();
                }
            };
        }
    }

ByteBuffer的基本api解说

``` // 新建一个12字节的ByteBuffer val bb = ByteBuffer.allocate(12)

    bb.putInt(23) // 1-4字节位子写入23
            .putInt(24) // 5-8字节位子写入24
            .putInt(30) // 9-12字节位子写入32
            .rewind()   // 把索引position重新置为0
            
        Log.i("xxxx,",""+bb.getInt(0))//从0即第一个字节开始读取四个字节,返回23
        Log.i("xxxx,",""+bb.getInt(4))//从索引4即第5个字节开始读取4个字节,返回24
        Log.i("xxxx,",""+bb.getInt(8))//从索引8即第9个字节开始读取4个字节,返回32
        
        bb.remind() // 把索引position重新置为0
        Log.i("xxxx,"当前索引="+bb.position)
        Log.i("xxxx,"读取int索引前进四位,""+bb.getInt()) 
        Log.i("xxxx,"当前索引="+bb.position)
        Log.i("xxxx,"读取long索引前进八位,"+bb.getLong()) 
        Log.i("xxxx,"当前索引="+bb.position)  ```

总结

  1. 本篇Java基础要求比较高,核心围绕ByteBuffer的展开。
  2. 其实Hlog只是用来保存protobuf的二进制字节流的一种实现,思考其他实现方式。